cl-rw

Layered streams for Common Lisp
git clone https://logand.com/git/cl-rw.git/
Log | Files | Refs

concurrency.lisp (5872B)


      1 ;;; Copyright (C) 2013, 2014 Tomas Hlavaty <tom@logand.com>
      2 ;;;
      3 ;;; Permission is hereby granted, free of charge, to any person
      4 ;;; obtaining a copy of this software and associated documentation
      5 ;;; files (the "Software"), to deal in the Software without
      6 ;;; restriction, including without limitation the rights to use, copy,
      7 ;;; modify, merge, publish, distribute, sublicense, and/or sell copies
      8 ;;; of the Software, and to permit persons to whom the Software is
      9 ;;; furnished to do so, subject to the following conditions:
     10 ;;;
     11 ;;; The above copyright notice and this permission notice shall be
     12 ;;; included in all copies or substantial portions of the Software.
     13 ;;;
     14 ;;; THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     15 ;;; EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     16 ;;; MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     17 ;;; NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
     18 ;;; HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
     19 ;;; WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     20 ;;; OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
     21 ;;; DEALINGS IN THE SOFTWARE.
     22 
     23 (defpackage :rw.concurrency
     24   (:use :cl)
     25   (:export :make-concurrent-queue
     26            :make-lock
     27            :make-program-server
     28            :make-semaphore
     29            :make-thread
     30            :signal-semaphore
     31            :threads-supported-p
     32            :using-lock
     33            :wait-on-semaphore))
     34 
     35 (in-package :rw.concurrency)
     36 
     37 (defun using-lock (lock thunk)
     38   #-(or allegro ccl ecl mkcl cmucl sbcl)
     39   (error "RW.CONCURRENCY:USING-LOCK not ported")
     40   #+allegro (mp:with-process-lock (lock) (funcall thunk))
     41   #+ccl (ccl:with-lock-grabbed (lock) (funcall thunk))
     42   #+ecl (mp:with-lock (lock) (funcall thunk))
     43   #+mkcl (mt:with-lock (lock) (funcall thunk))
     44   #+cmucl (mp:with-lock-held (lock) (funcall thunk))
     45   #+sbcl (sb-concurrency::with-mutex (lock) (funcall thunk)))
     46 
     47 (defun make-lock (name)
     48   #-(or allegro ccl ecl mkcl cmucl sbcl)
     49   (error "RW.CONCURRENCY:MAKE-LOCK not ported")
     50   #+allegro (mp:make-process-lock :name name)
     51   #+ccl (ccl:make-lock name)
     52   #+ecl (mp:make-lock :name name)
     53   #+mkcl (mt:make-lock :name name)
     54   #+cmucl (mp:make-lock name :kind :error-check)
     55   #+sbcl (sb-concurrency::make-mutex :name (string name)))
     56 
     57 (defun make-semaphore ()
     58   #-(or allegro ccl ecl mkcl sbcl)
     59   (error "RW.CONCURRENCY:MAKE-SEMAPHORE not ported")
     60   #+allegro (mp:make-gate nil)
     61   #+ccl (ccl:make-semaphore)
     62   #+ecl (mp:make-semaphore)
     63   #+mkcl (mt:make-semaphore)
     64   #+sbcl (sb-concurrency::make-semaphore))
     65 
     66 (defun signal-semaphore (x)
     67   #-(or allegro ccl ecl mkcl sbcl)
     68   (error "RW.CONCURRENCY:SIGNAL-SEMAPHORE not ported")
     69   #+allegro (mp:put-semaphore x)
     70   #+ccl (ccl:signal-semaphore x)
     71   #+ecl (mp:signal-semaphore x)
     72   #+mkcl (mt:semaphore-signal x)
     73   #+sbcl (sb-concurrency::signal-semaphore x))
     74 
     75 (defun wait-on-semaphore (x)
     76   #-(or ccl ecl mkcl sbcl)
     77   (error "RW.CONCURRENCY:WAIT-ON-SEMAPHORE not ported")
     78   #+ccl (ccl:wait-on-semaphore x)
     79   #+ecl (mp:wait-on-semaphore x)
     80   #+mkcl (mt:semaphore-wait x)
     81   #+sbcl (sb-concurrency::wait-on-semaphore x))
     82 
     83 (defun make-concurrent-queue ()
     84   (let ((x (cons nil nil))
     85         (l (make-lock 'concurrent-queue-lock))
     86         (s (make-semaphore)))
     87     (setf (car x) x)
     88     (lambda (&optional (value nil valuep))
     89       (if valuep
     90           (let ((y (cons value nil)))
     91             (using-lock l
     92                         (lambda ()
     93                           (setf (cdar x) y
     94                                 (car x) y)
     95                           (signal-semaphore s)))
     96             value)
     97           (do (done z)
     98               (done z)
     99             (wait-on-semaphore s)
    100             (using-lock l
    101                         (lambda ()
    102                           (unless (eq x (car x))
    103                             (setq done t
    104                                   z (pop (cdr x)))
    105                             (unless (cdr x)
    106                               (setf (car x) x))))))))))
    107 
    108 ;; (setq q (make-concurrent-queue))
    109 ;; (funcall q 1)
    110 ;; (funcall q 2)
    111 ;; (funcall q 3)
    112 ;; (funcall q)
    113 
    114 (defun threads-supported-p ()
    115   #+(or allegro ccl ecl mkcl cmucl sb-thread (and clisp mt))
    116   t)
    117 
    118 (defun make-thread (name thunk)
    119   #-(or allegro ccl ecl mkcl cmucl sbcl (and clisp mt))
    120   (error "RW.CONCURRENCY:MAKE-THREAD not ported")
    121   #+allegro (mp:process-run-function name thunk)
    122   #+ccl (ccl:process-run-function name thunk)
    123   #+ecl (mp:process-run-function name thunk)
    124   #+mkcl (mt:thread-run-function name thunk)
    125   #+cmucl (mp:make-process thunk :name name)
    126   #+sbcl (sb-concurrency::make-thread thunk :name (string name))
    127   #+(and clisp mt) (mt:make-thread thunk :name name))
    128 
    129 (defun make-program-server (command args writer reader)
    130   (let ((p (rw.os:make-program :stream :stream command args nil))
    131         (wq (make-concurrent-queue)))
    132     (make-thread 'program-server-writer
    133                  (let ((s (funcall p :input-stream)))
    134                    (lambda ()
    135                      (do (x)
    136                          ((not (setq x (funcall wq)))
    137                           (close s))
    138                        (funcall writer x s)))))
    139     (let ((l (make-lock 'program-server-lock))
    140           (s (funcall p :output-stream)))
    141       (lambda (query)
    142         (using-lock l
    143                     (lambda ()
    144                       (when wq
    145                         (cond
    146                           (query
    147                            (funcall wq query)
    148                            (funcall reader s))
    149                           (t
    150                            (funcall wq nil)
    151                            (setq wq nil)
    152                            (funcall p :wait)
    153                            (multiple-value-bind (status code)
    154                                (funcall p :status-and-code)
    155                              (assert (eq :exited status))
    156                              (assert (zerop code))))))))))))