cl-rw

Layered streams for Common Lisp
git clone https://logand.com/git/cl-rw.git/
Log | Files | Refs

commit c87dae16f8fb32057362822d7cc15cac903f125c
parent de3cd60876ad2961e3063450b791d2b505a100b0
Author: Tomas Hlavaty <tom@logand.com>
Date:   Sun, 13 Mar 2016 18:01:41 +0100

less macros more thunks

Diffstat:
Mconcurrency.lisp | 94+++++++++++++++++++++++++++++++++++++++++--------------------------------------
Mdns.lisp | 29++++++++++++++++-------------
Mos.lisp | 13++++++-------
Msocket.lisp | 11++++-------
Mui.lisp | 58+++++++++++++++++++++++++++++++---------------------------
5 files changed, 106 insertions(+), 99 deletions(-)

diff --git a/concurrency.lisp b/concurrency.lisp @@ -22,27 +22,27 @@ (defpackage :rw.concurrency (:use :cl) - (:export :with-lock + (:export :make-concurrent-queue :make-lock + :make-program-server :make-semaphore - :signal-semaphore - :wait-on-semaphore - :make-concurrent-queue :make-thread - :make-program-server - :threads-supported-p)) + :signal-semaphore + :threads-supported-p + :using-lock + :wait-on-semaphore)) (in-package :rw.concurrency) -(defmacro with-lock ((lock) &body body) +(defun using-lock (lock thunk) #-(or allegro ccl ecl mkcl cmucl sbcl) - (error "RW.CONCURRENCY:WITH-LOCK not ported") - #+allegro `(mp:with-process-lock (,lock) ,@body) - #+ccl `(ccl:with-lock-grabbed (,lock) ,@body) - #+ecl `(mp:with-lock (,lock) ,@body) - #+mkcl `(mt:with-lock (,lock) ,@body) - #+cmucl `(mp:with-lock-held (,lock) ,@body) - #+sbcl `(sb-concurrency::with-mutex (,lock) ,@body)) + (error "RW.CONCURRENCY:USING-LOCK not ported") + #+allegro (mp:with-process-lock (lock) (funcall thunk)) + #+ccl (ccl:with-lock-grabbed (lock) (funcall thunk)) + #+ecl (mp:with-lock (lock) (funcall thunk)) + #+mkcl (mt:with-lock (lock) (funcall thunk)) + #+cmucl (mp:with-lock-held (lock) (funcall thunk)) + #+sbcl (sb-concurrency::with-mutex (lock) (funcall thunk))) (defun make-lock (name) #-(or allegro ccl ecl mkcl cmucl sbcl) @@ -88,20 +88,22 @@ (lambda (&optional (value nil valuep)) (if valuep (let ((y (cons value nil))) - (with-lock (l) - (setf (cdar x) y - (car x) y) - (signal-semaphore s)) + (using-lock l + (lambda () + (setf (cdar x) y + (car x) y) + (signal-semaphore s))) value) (do (done z) (done z) (wait-on-semaphore s) - (with-lock (l) - (unless (eq x (car x)) - (setq done t - z (pop (cdr x))) - (unless (cdr x) - (setf (car x) x))))))))) + (using-lock l + (lambda () + (unless (eq x (car x)) + (setq done t + z (pop (cdr x))) + (unless (cdr x) + (setf (car x) x)))))))))) ;; (setq q (make-concurrent-queue)) ;; (funcall q 1) @@ -113,16 +115,16 @@ #+(or allegro ccl ecl mkcl cmucl sb-thread (and clisp mt)) t) -(defun make-thread (name fn) +(defun make-thread (name thunk) #-(or allegro ccl ecl mkcl cmucl sbcl (and clisp mt)) (error "RW.CONCURRENCY:MAKE-THREAD not ported") - #+allegro (mp:process-run-function name fn) - #+ccl (ccl:process-run-function name fn) - #+ecl (mp:process-run-function name fn) - #+mkcl (mt:thread-run-function name fn) - #+cmucl (mp:make-process fn :name name) - #+sbcl (sb-concurrency::make-thread fn :name (string name)) - #+(and clisp mt) (mt:make-thread fn :name name)) + #+allegro (mp:process-run-function name thunk) + #+ccl (ccl:process-run-function name thunk) + #+ecl (mp:process-run-function name thunk) + #+mkcl (mt:thread-run-function name thunk) + #+cmucl (mp:make-process thunk :name name) + #+sbcl (sb-concurrency::make-thread thunk :name (string name)) + #+(and clisp mt) (mt:make-thread thunk :name name)) (defun make-program-server (command args writer reader) (let ((p (rw.os:make-program :stream :stream command args nil)) @@ -137,16 +139,18 @@ (let ((l (make-lock 'program-server-lock)) (s (funcall p :output-stream))) (lambda (query) - (with-lock (l) - (when wq - (cond - (query - (funcall wq query) - (funcall reader s)) - (t - (funcall wq nil) - (setq wq nil) - (funcall p :wait) - (multiple-value-bind (status code) (funcall p :status-and-code) - (assert (eq :exited status)) - (assert (zerop code))))))))))) + (using-lock l + (lambda () + (when wq + (cond + (query + (funcall wq query) + (funcall reader s)) + (t + (funcall wq nil) + (setq wq nil) + (funcall p :wait) + (multiple-value-bind (status code) + (funcall p :status-and-code) + (assert (eq :exited status)) + (assert (zerop code)))))))))))) diff --git a/dns.lisp b/dns.lisp @@ -282,19 +282,22 @@ ($resource additional :size nadditional)) (defun udp (buf server port) - (rw.socket:with-socket (s (rw.socket:make-udp-socket)) - (rw.socket:udp-send s buf (length buf) - :remote-host server - :remote-port port) - (let ((n (array-total-size buf))) - (setf (fill-pointer buf) n) - (multiple-value-bind (b len addr) (rw.socket:udp-receive s buf n) - (declare (ignore addr)) - ;;(print (list :@@@ (subseq b 0 len))) - (flet ((cb (pos) - (next-$name (rw:skip (rw:reader b) pos)))) - (let ((*name-from-position* #'cb)) - (next-$message (rw:shorter-reader (rw:reader b) len)))))))) + (let ((s (rw.socket:make-udp-socket))) + (rw.socket:using-socket + s + (lambda () + (rw.socket:udp-send s buf (length buf) + :remote-host server + :remote-port port) + (let ((n (array-total-size buf))) + (setf (fill-pointer buf) n) + (multiple-value-bind (b len addr) (rw.socket:udp-receive s buf n) + (declare (ignore addr)) + ;;(print (list :@@@ (subseq b 0 len))) + (flet ((cb (pos) + (next-$name (rw:skip (rw:reader b) pos)))) + (let ((*name-from-position* #'cb)) + (next-$message (rw:shorter-reader (rw:reader b) len)))))))))) (defun query1 (name server &key (type 'A) (class 'IN) (port 53)) (let* ((n 512) ;; TODO minus IP/UDP headers diff --git a/os.lisp b/os.lisp @@ -29,7 +29,7 @@ :md5sum :run-command :sha1sum - :with-flock + :using-flock :with-program-io :with-program-output :with-temporary-file)) @@ -336,12 +336,11 @@ (-1 (error "flock ~s ~s ~s failed with code ~s" stream operation blockp (sb-alien:get-errno))))) -(defun call-with-flock (pathname shared fn) +(defun using-flock (pathname sharedp if-does-not-exist thunk) + (when (eq :create if-does-not-exist) + (open pathname :direction :probe :if-does-not-exist :create)) (with-open-file (s pathname :direction :output :if-exists :overwrite) - (flock s (if shared :shared :exclusive) t) - (funcall fn))) - -(defmacro with-flock ((pathname &key shared) &body body) - `(call-with-flock ,pathname ,shared (lambda () ,@body))) + (flock s (if sharedp :shared :exclusive) t) + (funcall thunk))) diff --git a/socket.lisp b/socket.lisp @@ -30,12 +30,12 @@ :ipv6-address-string :make-ipv4-address :make-ipv6-address - :make-tcp-server-socket :make-tcp-client-socket + :make-tcp-server-socket :make-udp-socket :udp-receive :udp-send - :with-socket)) + :using-socket)) (in-package :rw.socket) @@ -268,13 +268,10 @@ #+(or ecl sbcl mkcl) (sb-bsd-sockets:socket-close socket)) -(defun call-with-socket (socket fn) - (unwind-protect (funcall fn socket) +(defun using-socket (socket thunk) + (unwind-protect (funcall thunk) (close-socket socket))) -(defmacro with-socket ((var socket) &body body) - `(call-with-socket ,socket (lambda (,var) ,@body))) - (defun make-tcp-server-socket (local-host local-port &key backlog) #-(or allegro clisp sbcl ecl mkcl cmucl ccl) (error "RW.SOCKET:MAKE-TCP-SERVER-SOCKET not ported") diff --git a/ui.lisp b/ui.lisp @@ -157,17 +157,19 @@ (defparameter *session-lifespan* (* 60 60)) (defun make-session (sid create construct) - (let ((lock (rw.concurrency:make-lock "session ~s")) - (touched (get-universal-time)) - (stepper (make-stepper sid create construct))) + (let ((l (rw.concurrency:make-lock "session ~s")) + (n (get-universal-time)) + (s (make-stepper sid create construct))) (lambda (aid) - (rw.concurrency:with-lock (lock) - (cond - ((eq t aid) - (< (- (get-universal-time) touched) *session-lifespan*)) - (t - (setq touched (get-universal-time)) - (funcall stepper aid))))))) + (rw.concurrency:using-lock + l + (lambda () + (cond + ((eq t aid) + (< (- (get-universal-time) n) *session-lifespan*)) + (t + (setq n (get-universal-time)) + (funcall s aid)))))))) (defun rd (cnt) (let ((s *standard-input*)) @@ -199,29 +201,31 @@ (rd 4)))) (defun make-pool () - (let ((sessions (make-hash-table :test #'equal)) - (lock (rw.concurrency:make-lock "pool ~s"))) + (let ((s (make-hash-table :test #'equal)) + (l (rw.concurrency:make-lock "pool ~s"))) (lambda (create construct deconstruct) (multiple-value-bind (sid aid *renv*) (funcall deconstruct) (let ((aid2 (parse36 aid))) ;; number=action|string=resource (when aid2 (setq aid aid2))) (funcall - (rw.concurrency:with-lock (lock) - (maphash (lambda (k v) - (unless (funcall v t) - (remhash k sessions))) - sessions) - (let ((x (and sid aid (gethash sid sessions)))) - (if x - (lambda () (funcall x aid)) - (do () - ((not (gethash (setq sid (generate-sid)) sessions)) - (setf (gethash sid sessions) - (make-session sid create construct)) - (lambda () - (http-redirect - (funcall construct sid (pretty36 0) *renv*))))))))))))) + (rw.concurrency:using-lock + l + (lambda () + (maphash (lambda (k v) + (unless (funcall v t) + (remhash k s))) + s) + (let ((x (and sid aid (gethash sid s)))) + (if x + (lambda () (funcall x aid)) + (do () + ((not (gethash (setq sid (generate-sid)) s)) + (setf (gethash sid s) + (make-session sid create construct)) + (lambda () + (http-redirect + (funcall construct sid (pretty36 0) *renv*)))))))))))))) (defparameter *pool* (make-pool))