ref: 9c25e3a38f465444d684a58a24f7567e6b59299c
dir: /plan9/drawserv.c/
/*
* drawserv_udp.c - Plan 9 draw server with UDP (9front)
*
* Based on working drawserv.c (TCP version), converted to UDP.
* Receives LZ4 compressed tiles, NO XOR delta.
*
* Modified: Accumulate tiles until FLUSH, keeping only most recent frame per tile.
*
* Build:
* 6c drawserv_udp.c lz4.c
* 6l -o drawserv_udp drawserv_udp.6 lz4.6
*/
#include <u.h>
#include <libc.h>
#include <draw.h>
#include <thread.h>
#include <mouse.h>
#include <keyboard.h>
#include "lz4.h"
/* Protocol */
#define P9WL_MAGIC 0x50395749
#define P9WL_VERSION 1
#define P9WL_HDR_SIZE 20
#define P9WL_MTU 1400
#define P9WL_MAX_CHUNK (P9WL_MTU - 44)
#define P9WL_SIZE_REQ 0x01
#define P9WL_SIZE_RESP 0x02
#define P9WL_TILE 0x10
#define P9WL_FLUSH 0x15
#define P9WL_NACK 0x16
#define P9WL_SCROLL 0x17
#define P9WL_MOUSE 0x20
#define P9WL_KEY 0x21
#define P9WL_RESIZE 0x22
#define P9WL_FLAG_COMPRESSED 0x01
#define P9WL_FLAG_HAS_COPIES 0x02
#define HDR_MAGIC 0
#define HDR_VERSION 4
#define HDR_TYPE 5
#define HDR_FLAGS 6
#define HDR_SCROLL_ID 7
#define HDR_SEQ 8
#define HDR_FRAME 12
#define HDR_TIMESTAMP 16
#define GET16(p) ((uint)(p)[0] | ((uint)(p)[1]<<8))
#define GET32(p) ((ulong)(p)[0] | ((ulong)(p)[1]<<8) | ((ulong)(p)[2]<<16) | ((ulong)(p)[3]<<24))
#define PUT16(p, v) do { (p)[0]=(uchar)(v); (p)[1]=(uchar)((v)>>8); } while(0)
#define PUT32(p, v) do { (p)[0]=(uchar)(v); (p)[1]=(uchar)((v)>>8); (p)[2]=(uchar)((v)>>16); (p)[3]=(uchar)((v)>>24); } while(0)
Mousectl *mctl;
Keyboardctl *kctl;
int mainstacksize = 32768;
/* UDP */
int udpdata = -1;
uchar clienthdr[52];
int clientknown = 0;
Lock udplock;
ulong sendseq = 0;
int debug = 0;
/* Display */
Image *tileimg;
int tileimgw, tileimgh;
/* Packet channel */
typedef struct Packet Packet;
struct Packet {
uchar hdr[52];
uchar data[2048];
int len;
};
Channel *pktch;
/* Tile reassembly for multi-chunk tiles */
typedef struct Pending Pending;
struct Pending {
int x, y, w, h;
int flags;
int chunk_count;
int total_size;
uchar *data;
uchar *got;
int ngot;
ulong frame;
vlong when;
};
#define MAXPENDING 64
Pending pending[MAXPENDING];
Lock pendlock;
#define MAX_COPIES 512
/* Drop counters for diagnostics */
long drop_pktch = 0;
long tiles_drawn = 0;
long tiles_failed = 0;
long tiles_superseded = 0;
ulong last_stats = 0;
/* Frame tile counting */
ulong current_frame = 0;
/* Tile tracking */
#define TILE_SIZE 16
#define MAX_TILES_X 256
#define MAX_TILES_Y 256
/* Inline decompression buffer */
uchar *inline_decbuf = nil;
int inline_decbuf_size = 0;
/*
* Tile accumulation buffer
*
* Tiles are buffered here until FLUSH. Each slot holds decompressed
* pixels for one tile position. If multiple packets arrive for the
* same tile, only the one with the highest frame number is kept.
*/
typedef struct AccumTile AccumTile;
struct AccumTile {
int valid;
ulong frame;
int x, y, w, h;
uchar *pixels; /* Decompressed XBGR, w*h*4 bytes */
int pixelsize; /* Allocated size of pixels buffer */
};
#define MAX_ACCUM (MAX_TILES_X * MAX_TILES_Y)
AccumTile accum[MAX_ACCUM];
int
tile_index(int x, int y)
{
int tx, ty;
tx = x / TILE_SIZE;
ty = y / TILE_SIZE;
if(tx < 0 || tx >= MAX_TILES_X || ty < 0 || ty >= MAX_TILES_Y)
return -1;
return ty * MAX_TILES_X + tx;
}
/*
* Store a decompressed tile in the accumulation buffer.
* Returns 1 if stored, 0 if superseded by a newer frame.
*/
int
accum_tile(int x, int y, int w, int h, ulong frame, uchar *pixels)
{
int idx, nbytes;
AccumTile *t;
idx = tile_index(x, y);
if(idx < 0)
return 0;
t = &accum[idx];
/* Only update if this frame is newer or equal */
if(t->valid && frame < t->frame){
tiles_superseded++;
return 0;
}
nbytes = w * h * 4;
/* Allocate or grow pixel buffer if needed */
if(t->pixels == nil || t->pixelsize < nbytes){
free(t->pixels);
t->pixels = malloc(nbytes);
if(t->pixels == nil)
return 0;
t->pixelsize = nbytes;
}
memmove(t->pixels, pixels, nbytes);
t->x = x;
t->y = y;
t->w = w;
t->h = h;
t->frame = frame;
t->valid = 1;
return 1;
}
/*
* Draw all accumulated tiles to screen, then clear the buffer.
*/
void
flush_accum(void)
{
int i, count;
AccumTile *t;
Rectangle rect, dst;
count = 0;
for(i = 0; i < MAX_ACCUM; i++){
t = &accum[i];
if(!t->valid)
continue;
/* Ensure tileimg is big enough */
if(tileimg == nil || t->w > tileimgw || t->h > tileimgh){
if(tileimg)
freeimage(tileimg);
tileimgw = (t->w + 63) & ~63;
tileimgh = (t->h + 63) & ~63;
tileimg = allocimage(display, Rect(0, 0, tileimgw, tileimgh), XBGR32, 0, DNofill);
if(tileimg == nil){
fprint(2, "allocimage %dx%d failed: %r\n", tileimgw, tileimgh);
t->valid = 0;
continue;
}
}
rect = Rect(0, 0, t->w, t->h);
if(loadimage(tileimg, rect, t->pixels, t->w * t->h * 4) < 0){
fprint(2, "loadimage failed: %r\n");
t->valid = 0;
continue;
}
dst.min = addpt(screen->r.min, Pt(t->x, t->y));
dst.max = addpt(dst.min, Pt(t->w, t->h));
draw(screen, dst, tileimg, nil, rect.min);
t->valid = 0;
count++;
}
tiles_drawn += count;
flushimage(display, 1);
}
ulong
now_ms(void)
{
return nsec() / 1000000;
}
void
udpsend(uchar *pkt, int len)
{
uchar buf[2048];
if(udpdata < 0 || !clientknown)
return;
lock(&udplock);
if(len + 52 <= sizeof(buf)){
memmove(buf, clienthdr, 52);
memmove(buf + 52, pkt, len);
write(udpdata, buf, 52 + len);
}
unlock(&udplock);
}
void
send_size_resp(void)
{
uchar pkt[24];
PUT32(pkt + HDR_MAGIC, P9WL_MAGIC);
pkt[HDR_VERSION] = P9WL_VERSION;
pkt[HDR_TYPE] = P9WL_SIZE_RESP;
pkt[HDR_FLAGS] = 0;
pkt[7] = 0;
PUT32(pkt + HDR_SEQ, sendseq++);
PUT32(pkt + HDR_FRAME, 0);
PUT32(pkt + HDR_TIMESTAMP, now_ms());
PUT16(pkt + 20, Dx(screen->r));
PUT16(pkt + 22, Dy(screen->r));
//fprint(2, "SIZE_RESP: %dx%d\n", Dx(screen->r), Dy(screen->r));
udpsend(pkt, 24);
}
void
send_mouse(int x, int y, int b)
{
uchar pkt[28];
if(!clientknown)
return;
PUT32(pkt + HDR_MAGIC, P9WL_MAGIC);
pkt[HDR_VERSION] = P9WL_VERSION;
pkt[HDR_TYPE] = P9WL_MOUSE;
pkt[HDR_FLAGS] = 0;
pkt[7] = 0;
PUT32(pkt + HDR_SEQ, sendseq++);
PUT32(pkt + HDR_FRAME, 0);
PUT32(pkt + HDR_TIMESTAMP, now_ms());
PUT16(pkt + 20, x);
PUT16(pkt + 22, y);
pkt[24] = b;
pkt[25] = pkt[26] = pkt[27] = 0;
udpsend(pkt, 28);
}
void
send_key(Rune k)
{
uchar pkt[28];
if(!clientknown)
return;
PUT32(pkt + HDR_MAGIC, P9WL_MAGIC);
pkt[HDR_VERSION] = P9WL_VERSION;
pkt[HDR_TYPE] = P9WL_KEY;
pkt[HDR_FLAGS] = 0;
pkt[7] = 0;
PUT32(pkt + HDR_SEQ, sendseq++);
PUT32(pkt + HDR_FRAME, 0);
PUT32(pkt + HDR_TIMESTAMP, now_ms());
PUT32(pkt + 20, k);
pkt[24] = 1;
pkt[25] = pkt[26] = pkt[27] = 0;
udpsend(pkt, 28);
}
void
send_resize(int w, int h)
{
uchar pkt[24];
int i;
if(!clientknown)
return;
PUT32(pkt + HDR_MAGIC, P9WL_MAGIC);
pkt[HDR_VERSION] = P9WL_VERSION;
pkt[HDR_TYPE] = P9WL_RESIZE;
pkt[HDR_FLAGS] = 0;
pkt[7] = 0;
PUT32(pkt + HDR_SEQ, sendseq++);
PUT32(pkt + HDR_FRAME, 0);
PUT32(pkt + HDR_TIMESTAMP, now_ms());
PUT16(pkt + 20, w);
PUT16(pkt + 22, h);
for(i = 0; i < 5; i++){
udpsend(pkt, 24);
sleep(20);
}
}
Pending*
get_pending(int x, int y, int w, int h, int chunk_count, int total_size, int flags, ulong frame)
{
Pending *p;
vlong now;
int i;
now = nsec();
lock(&pendlock);
/* Find existing */
for(i = 0; i < MAXPENDING; i++){
p = &pending[i];
if(p->data && p->x == x && p->y == y && p->w == w && p->h == h && p->frame == frame){
unlock(&pendlock);
return p;
}
}
/* Find free slot (or oldest) */
Pending *oldest = &pending[0];
for(i = 0; i < MAXPENDING; i++){
p = &pending[i];
if(p->data == nil){
oldest = p;
break;
}
if(p->when < oldest->when)
oldest = p;
}
p = oldest;
free(p->data);
free(p->got);
p->x = x;
p->y = y;
p->w = w;
p->h = h;
p->flags = flags;
p->chunk_count = chunk_count;
p->total_size = total_size;
p->data = malloc(total_size);
p->got = malloc(chunk_count);
p->ngot = 0;
p->frame = frame;
p->when = now;
if(p->data && p->got)
memset(p->got, 0, chunk_count);
unlock(&pendlock);
return p;
}
int
add_chunk(Pending *p, int idx, uchar *data, int len)
{
int offset;
if(p == nil || p->data == nil || p->got == nil)
return 0;
if(idx < 0 || idx >= p->chunk_count)
return 0;
if(p->got[idx])
return 0;
offset = idx * P9WL_MAX_CHUNK;
if(offset + len > p->total_size)
len = p->total_size - offset;
memmove(p->data + offset, data, len);
p->got[idx] = 1;
p->ngot++;
return p->ngot >= p->chunk_count;
}
void
handle_tile(uchar *pkt, int len, int flags, ulong frame)
{
int x, y, w, h;
int chunk_idx, chunk_count;
int total_size, chunk_size;
uchar *data;
Pending *p;
int num_copies, i;
int copy_offset;
if(len < 40)
return;
x = GET16(pkt + 20);
y = GET16(pkt + 22);
w = GET16(pkt + 24);
h = GET16(pkt + 26);
chunk_idx = GET16(pkt + 28);
chunk_count = GET16(pkt + 30);
total_size = GET32(pkt + 32);
chunk_size = GET32(pkt + 36);
data = pkt + 40;
if(len - 40 < (int)chunk_size)
return;
/* Single chunk - decompress and accumulate */
if(chunk_count == 1){
int nbytes = w * h * 4;
int declen;
int copy_dx[MAX_COPIES], copy_dy[MAX_COPIES];
int nc = 0;
/* Parse copy destinations first */
if(flags & P9WL_FLAG_HAS_COPIES){
copy_offset = 40 + chunk_size;
if(len >= copy_offset + 2){
num_copies = GET16(pkt + copy_offset);
if(num_copies > MAX_COPIES) num_copies = MAX_COPIES;
for(i = 0; i < num_copies && copy_offset + 2 + (i+1)*4 <= len; i++){
copy_dx[i] = GET16(pkt + copy_offset + 2 + i*4);
copy_dy[i] = GET16(pkt + copy_offset + 2 + i*4 + 2);
}
nc = i;
}
}
/* Ensure decode buffer is large enough */
if(nbytes > inline_decbuf_size){
free(inline_decbuf);
inline_decbuf = malloc(nbytes);
inline_decbuf_size = inline_decbuf ? nbytes : 0;
}
if(inline_decbuf == nil){
tiles_failed++;
return;
}
/* Decompress */
if(flags & P9WL_FLAG_COMPRESSED){
declen = LZ4_decompress_safe((char*)data, (char*)inline_decbuf, chunk_size, nbytes);
if(declen != nbytes){
tiles_failed++;
return;
}
} else {
if((int)chunk_size != nbytes){
tiles_failed++;
return;
}
memmove(inline_decbuf, data, nbytes);
}
/* Accumulate primary tile */
accum_tile(x, y, w, h, frame, inline_decbuf);
/* Accumulate copies (same pixels, different positions) */
for(i = 0; i < nc; i++){
accum_tile(copy_dx[i], copy_dy[i], w, h, frame, inline_decbuf);
}
return;
}
/* Multi-chunk - reassemble first */
p = get_pending(x, y, w, h, chunk_count, total_size, flags, frame);
if(p == nil)
return;
if(!add_chunk(p, chunk_idx, data, chunk_size))
return;
/* Complete - decompress and accumulate */
{
int nbytes = w * h * 4;
int declen;
uchar *tiledata = p->data;
int tilesize = p->total_size;
int tileflags = p->flags;
/* Ensure decode buffer is large enough */
if(nbytes > inline_decbuf_size){
free(inline_decbuf);
inline_decbuf = malloc(nbytes);
inline_decbuf_size = inline_decbuf ? nbytes : 0;
}
lock(&pendlock);
free(p->got);
p->got = nil;
p->data = nil;
unlock(&pendlock);
if(inline_decbuf == nil){
tiles_failed++;
free(tiledata);
return;
}
/* Decompress */
if(tileflags & P9WL_FLAG_COMPRESSED){
declen = LZ4_decompress_safe((char*)tiledata, (char*)inline_decbuf, tilesize, nbytes);
free(tiledata);
if(declen != nbytes){
tiles_failed++;
return;
}
} else {
if(tilesize != nbytes){
tiles_failed++;
free(tiledata);
return;
}
memmove(inline_decbuf, tiledata, nbytes);
free(tiledata);
}
/* Accumulate tile */
accum_tile(x, y, w, h, frame, inline_decbuf);
}
}
void
handle_packet(uchar *pkt, int len, uchar *hdr)
{
ulong magic, frame, seq;
int type, flags;
static ulong last_seq = 0;
static int seq_init = 0;
static long total_gaps = 0;
static long total_pkts = 0;
if(len < P9WL_HDR_SIZE)
return;
magic = GET32(pkt + HDR_MAGIC);
if(magic != P9WL_MAGIC)
return;
if(pkt[HDR_VERSION] != P9WL_VERSION)
return;
/* Save client address */
if(!clientknown){
lock(&udplock);
memmove(clienthdr, hdr, 52);
clientknown = 1;
unlock(&udplock);
if(debug)
fprint(2, "client connected\n");
}
type = pkt[HDR_TYPE];
flags = pkt[HDR_FLAGS];
frame = GET32(pkt + HDR_FRAME);
seq = GET32(pkt + HDR_SEQ);
/* Track sequence gaps */
total_pkts++;
if(seq_init){
if(seq != last_seq + 1 && seq > last_seq){
long gap = seq - last_seq - 1;
total_gaps += gap;
if(debug)
fprint(2, "SEQ GAP: expected %uld got %uld (lost %ld, total lost %ld/%ld)\n",
last_seq + 1, seq, gap, total_gaps, total_pkts);
}
}
last_seq = seq;
seq_init = 1;
switch(type){
case P9WL_SIZE_REQ:
send_size_resp();
break;
case P9WL_TILE:
handle_tile(pkt, len, flags, frame);
break;
case P9WL_FLUSH:
/* Draw all accumulated tiles */
flush_accum();
break;
case P9WL_SCROLL:
/* Scroll disabled - ignore */
break;
}
}
void
netproc(void *arg)
{
char *port = arg;
char buf[64], ldir[40], ctlpath[40];
int n, acfd, ctlfd;
Packet *p;
/* Pre-allocate packet pool */
#define PACKET_POOL_SIZE 8192
static Packet packet_pool[PACKET_POOL_SIZE];
static int pool_idx = 0;
acfd = open("/net/udp/clone", ORDWR);
if(acfd < 0){
fprint(2, "open /net/udp/clone: %r\n");
threadexitsall("udp");
}
n = read(acfd, buf, sizeof(buf)-1);
if(n <= 0){
fprint(2, "read clone: %r\n");
threadexitsall("udp");
}
buf[n] = 0;
/* Try to increase UDP receive buffer */
snprint(ctlpath, sizeof(ctlpath), "/net/udp/%s/ctl", buf);
ctlfd = open(ctlpath, OWRITE);
if(ctlfd >= 0){
fprint(ctlfd, "rcvbuf 4194304");
close(ctlfd);
}
snprint(ldir, sizeof(ldir), "/net/udp/%s/data", buf);
if(fprint(acfd, "headers") < 0){
fprint(2, "headers: %r\n");
threadexitsall("udp");
}
if(fprint(acfd, "announce %s", port) < 0){
fprint(2, "announce %s: %r\n", port);
threadexitsall("udp");
}
udpdata = open(ldir, ORDWR);
if(udpdata < 0){
fprint(2, "open %s: %r\n", ldir);
threadexitsall("udp");
}
if(debug)
fprint(2, "listening on UDP port %s\n", port);
for(;;){
p = &packet_pool[pool_idx];
pool_idx = (pool_idx + 1) % PACKET_POOL_SIZE;
n = read(udpdata, p->data, sizeof(p->data));
if(n <= 52)
continue;
memmove(p->hdr, p->data, 52);
p->len = n - 52;
memmove(p->data, p->data + 52, p->len);
if(nbsend(pktch, &p) == 0){
drop_pktch++;
}
}
}
void
threadmain(int argc, char *argv[])
{
char *port;
Mouse m;
Rune k;
Packet *pkt;
Alt alts[5];
int i;
port = "5556";
if(argc > 1)
port = argv[1];
if(initdraw(nil, nil, "drawserv") < 0)
sysfatal("initdraw: %r");
mctl = initmouse(nil, screen);
if(mctl == nil)
sysfatal("initmouse: %r");
kctl = initkeyboard(nil);
if(kctl == nil)
sysfatal("initkeyboard: %r");
pktch = chancreate(sizeof(Packet*), 8192);
for(i = 0; i < MAXPENDING; i++){
pending[i].data = nil;
pending[i].got = nil;
}
/* Initialize accumulation buffer */
for(i = 0; i < MAX_ACCUM; i++){
accum[i].valid = 0;
accum[i].pixels = nil;
accum[i].pixelsize = 0;
}
draw(screen, screen->r, display->black, nil, ZP);
flushimage(display, 1);
if(debug)
fprint(2, "drawserv_udp %dx%d\n", Dx(screen->r), Dy(screen->r));
proccreate(netproc, port, 32768);
alts[0].c = mctl->c;
alts[0].v = &m;
alts[0].op = CHANRCV;
alts[1].c = mctl->resizec;
alts[1].v = nil;
alts[1].op = CHANRCV;
alts[2].c = kctl->c;
alts[2].v = &k;
alts[2].op = CHANRCV;
alts[3].c = pktch;
alts[3].v = &pkt;
alts[3].op = CHANRCV;
alts[4].op = CHANEND;
last_stats = now_ms();
for(;;){
/* Print stats every 5 seconds */
ulong now = now_ms();
if(now - last_stats > 5000){
if(drop_pktch || tiles_superseded)
if(debug)
fprint(2, "stats: drops=%ld superseded=%ld drawn=%ld\n",
drop_pktch, tiles_superseded, tiles_drawn);
drop_pktch = 0;
tiles_superseded = 0;
tiles_drawn = 0;
last_stats = now;
}
switch(alt(alts)){
case 0: /* mouse */
send_mouse(m.xy.x - screen->r.min.x,
m.xy.y - screen->r.min.y,
m.buttons);
break;
case 1: /* resize */
if(getwindow(display, Refnone) < 0)
sysfatal("getwindow: %r");
if(tileimg){
freeimage(tileimg);
tileimg = nil;
tileimgw = tileimgh = 0;
}
draw(screen, screen->r, display->black, nil, ZP);
flushimage(display, 1);
if(debug)
fprint(2, "resize: %dx%d\n", Dx(screen->r), Dy(screen->r));
send_resize(Dx(screen->r), Dy(screen->r));
break;
case 2: /* keyboard */
if(k == Kdel)
threadexitsall(nil);
send_key(k);
break;
case 3: /* network packet */
handle_packet(pkt->data, pkt->len, pkt->hdr);
/* Drain ALL pending packets before checking other channels */
while(nbrecv(pktch, &pkt) > 0){
handle_packet(pkt->data, pkt->len, pkt->hdr);
}
break;
}
}
}