cl-rw

Layered streams for Common Lisp
git clone https://logand.com/git/cl-rw.git/
Log | Files | Refs

commit 99a7238016fa1e73178c02ef2a1863b22b203dcf
parent 42952192d8efeed5c0c6d99541e85ed45413cf9a
Author: Tomas Hlavaty <tom@logand.com>
Date:   Thu, 29 Aug 2013 23:02:09 +0200

rw.concurrency added

Diffstat:
Mcl-rw.asd | 3++-
Aconcurrency.lisp | 126+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 128 insertions(+), 1 deletion(-)

diff --git a/cl-rw.asd b/cl-rw.asd @@ -41,4 +41,5 @@ (:file "xml") (:file "email") (:file "os") - (:file "net"))) + (:file "net") + (:file "concurrency"))) diff --git a/concurrency.lisp b/concurrency.lisp @@ -0,0 +1,126 @@ +;;; Copyright (C) 2013 Tomas Hlavaty <tom@logand.com> +;;; +;;; Permission is hereby granted, free of charge, to any person +;;; obtaining a copy of this software and associated documentation +;;; files (the "Software"), to deal in the Software without +;;; restriction, including without limitation the rights to use, copy, +;;; modify, merge, publish, distribute, sublicense, and/or sell copies +;;; of the Software, and to permit persons to whom the Software is +;;; furnished to do so, subject to the following conditions: +;;; +;;; The above copyright notice and this permission notice shall be +;;; included in all copies or substantial portions of the Software. +;;; +;;; THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +;;; EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +;;; MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +;;; NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +;;; HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +;;; WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +;;; OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +;;; DEALINGS IN THE SOFTWARE. + +(defpackage :rw.concurrency + (:use :cl) + (:export :with-lock + :make-lock + :make-semaphore + :signal-semaphore + :wait-on-semaphore + :make-concurrent-queue + :make-thread + :make-program-server)) + +(in-package :rw.concurrency) + +(defmacro with-lock ((lock) &body body) + #-(or ccl ecl sbcl) (error "RW.CONCURRENCY:WITH-LOCK not ported") + #+ccl `(ccl:with-lock-grabbed (,lock) ,@body) + #+ecl `(mp:with-lock (,lock) ,@body) + #+sbcl `(sb-concurrency::with-mutex (,lock) ,@body)) + +(defun make-lock (name) + #-(or ccl ecl sbcl) (error "RW.CONCURRENCY:MAKE-LOCK not ported") + #+ccl (ccl:make-lock name) + #+ecl (mp:make-lock :name name) + #+sbcl (sb-concurrency::make-mutex :name (string name))) + +(defun make-semaphore () + #-(or ccl ecl sbcl) (error "RW.CONCURRENCY:MAKE-SEMAPHORE not ported") + #+ccl (ccl:make-semaphore) + #+ecl (mp:make-semaphore) + #+sbcl (sb-concurrency::make-semaphore)) + +(defun signal-semaphore (x) + #-(or ccl ecl sbcl) (error "RW.CONCURRENCY:SIGNAL-SEMAPHORE not ported") + #+ccl (ccl:signal-semaphore x) + #+ecl (mp:signal-semaphore x) + #+sbcl (sb-concurrency::signal-semaphore x)) + +(defun wait-on-semaphore (x) + #-(or ccl ecl sbcl) (error "RW.CONCURRENCY:WAIT-ON-SEMAPHORE not ported") + #+ccl (ccl:wait-on-semaphore x) + #+ecl (mp:wait-on-semaphore x) + #+sbcl (sb-concurrency::wait-on-semaphore x)) + +(defun make-concurrent-queue () + (let ((x (cons nil nil)) + (l (make-lock 'concurrent-queue-lock)) + (s (make-semaphore))) + (setf (car x) x) + (lambda (&optional (value nil valuep)) + (if valuep + (let ((y (cons value nil))) + (with-lock (l) + (setf (cdar x) y + (car x) y) + (signal-semaphore s)) + value) + (do (done z) + (done z) + (wait-on-semaphore s) + (with-lock (l) + (unless (eq x (car x)) + (setq done t + z (pop (cdr x))) + (unless (cdr x) + (setf (car x) x))))))))) + +;; (setq q (make-concurrent-queue)) +;; (funcall q 1) +;; (funcall q 2) +;; (funcall q 3) +;; (funcall q) + +(defun make-thread (name fn) + #-(or ccl ecl sbcl) (error "RW.CONCURRENCY:MAKE-THREAD not ported") + #+ccl (ccl:process-run-function name fn) + #+ecl (mp:process-run-function name fn) + #+sbcl (sb-concurrency::make-thread fn :name (string name))) + +(defun make-program-server (command args writer reader) + (let ((p (rw.os:make-program :stream :stream command args)) + (wq (make-concurrent-queue))) + (make-thread 'program-server-writer + (let ((s (funcall p 'input-stream))) + (lambda () + (do (x) + ((not (setq x (funcall wq))) + (close s)) + (funcall writer x s))))) + (let ((l (make-lock 'program-server-lock)) + (s (funcall p 'output-stream))) + (lambda (query) + (with-lock (l) + (when wq + (cond + (query + (funcall wq query) + (funcall reader s)) + (t + (funcall wq nil) + (setq wq nil) + (funcall p 'wait) + (multiple-value-bind (status code) (funcall p 'status-and-code) + (assert (eq :exited status)) + (assert (zerop code)))))))))))