Skip to content
tcp-server.lisp 7.8 KiB
Newer Older
;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
;;;;; Simple TCP server

(in-package :philip-jose)


;;;; Generic TCP routines

(defun make-tcp-server-socket (address port)
  (let ((socket (make-socket :address-family :internet
                             :type :stream
                             :connect :passive
                             :ipv6 nil)))
    (bind-address socket address :port (or port 0))
    (socket-listen socket)
    socket))

(defun call-connection-handler (socket handler &rest args)
  (with-trivial-logging ()
    (unwind-protect
         (let* ((*standard-input* socket)
                (*standard-output* socket))
           (apply handler args)
           (finish-output))
      (close socket))))


;;;; Spawners for a multi-threaded system having one thread per connection.
(defun make-connection-worker (socket handler &rest args)
  #'(lambda ()
      (apply #'call-connection-handler socket handler args)))

(defun connection-spawner (listener-socket handler)
  #'(lambda (fd evtype)
      (if (eql evtype :read)
        (let ((connection (accept-connection listener-socket)))
          (make-thread (make-connection-worker connection handler)) :name "connection worker")
        (logger "~&Error on FD ~A" fd))))

(defun add-multi-threaded-server (event-base listener-socket handler)
  (iomux::add-fd event-base (sockets::socket-fd listener-socket)
                 :read (connection-spawner listener-socket handler)
                 :persistent t))


;;;; Spawners for a single-threaded system
;; Generic code that implements an efficient non-blocking single-threaded server
;; based on an event-loop and incremental message:
;; maintain incrementally parsed request buffers
;; and process requests when complete.
;;
;; The same single server thread can listen to many event sources at once and
;; implement many servers using this (or other compatible) protocols.
;;
;; Reading bytes (or -- ugly hack -- latin1) would avoid ugly SB-IMPL::END-OF-INPUT-IN-CHARACTER
;; when a packet is cut in the middle of a multi-byte character representation.
;; For anything non-trivial, we really want bytes when reading/writing, and a
;; flexible stream protocol.
;;
;; My design choice: I want to minimize the time spent in
;; the single-threaded network message parser/dispatcher
;; so although my source code for the parser will have characters and strings,
;; I will be processing bytes directly the forked/threaded clients can do
;; octets-to-string on whatever the parser gives them.
;;
;; As long as I only use ASCII this will do with BASE-CHAR.

(defun add-connection-handler (event-base connection incremental-parser request-handler
                                          &key (element-type 'base-char)
                                          (external-format :iso-8859-1) ;; :default
                                          (max-buffer-size *max-request-size*))
  ;;; There is no race condition between setting handle and reading it in close-handler,
  ;;; because events are not handled asynchronously, but only at the next event-dispatch
  ;;; so everything we do inside a handler is properly isolated from other handlers
  ;;; (but not from other threads).
  (declare (ignorable element-type external-format max-buffer-size)) ;---***
  (let* ((bytes-p (typep 0 element-type))
	 (initial-element (if bytes-p 0 #\space))
	 (buffer (make-array (list max-buffer-size)
			     :element-type element-type
	                     :initial-element initial-element))
	 (size 0)
         (state nil)
         handle)
    (labels
        ((close-handler ()
           (remove-event event-base handle)
           (close connection))
         (handler (fd evtype)
           (declare (ignorable fd))
           (handler-case
               (cond
                 ((eql evtype :read)
                  ;;--- cannot read-sequence, because it will block. Have to use read.
                  (let ((packet-size
                         (cffi:with-pointer-to-vector-data (buf-ptr buffer)
                           (et:read fd (cffi:inc-pointer buf-ptr size) (- max-buffer-size size)))))
                    (incf size packet-size)
                    (logger "~&Incoming packet from ~A: ~D bytes~%"
                            (sockets::vector-to-dotted
                             (sockets::name
                              (sockets::remote-address connection)))
                            packet-size)
                    (multiple-value-bind (x y)
                        (funcall incremental-parser buffer size state)
                      (cond
                        (x
                         (funcall request-handler connection buffer size y)
                         (close-handler))
                        ((= size max-buffer-size)
                         ;;---*** log an error?
                         (close-handler))
                        (t
                         (setf state y)))))) ; get ready for next packet
                  (t
                   (close-handler)))
                 (condition (c)
                            (logger "~&~S~%" c)
                            (close-handler)))))
         (setf handle (iomux::add-fd event-base (sockets::socket-fd connection)
                                     :read #'handler :persistent t)))))

(defun add-single-threaded-server (event-base socket incremental-parser request-handler &rest keys
                               &key element-type external-format max-buffer-size)
  (declare (ignorable element-type external-format max-buffer-size))
  (labels
      ((handler (fd evtype)
         (declare (ignorable fd))
         (with-trivial-logging ()
           (case evtype
             (:read
              (let ((connection (accept-connection socket)))
                (logger "~&Connection opened with ~A~%"
                        (sockets::vector-to-dotted
                         (sockets::name (sockets::remote-address connection))))
                (apply #'add-connection-handler event-base connection incremental-parser request-handler keys)))
             (otherwise
              (logger "~&Weird event ~S on ~S~%" evtype fd))))))
    (iomux::add-fd event-base (sockets::socket-fd socket)
                   :read #'handler :persistent t)))





(defun ensure-event-base ()
  ; can't put this make-instance form in specials.lisp because compiletime != runtime
  (if (null *event-base*)
    (setf *event-base* (make-instance 'iomux:event-base))))

;; A one central server socket
(defun make-server (&optional (address +ipv4-unspecified+) (port *port*))
  (when *server*
    (close *server*))
  (ensure-event-base)
  (setf *server* (make-tcp-server-socket address port))
  (setf *port* (sockets::local-port *server*)))

(def*fun start-multi-threaded-server (&key (handler #'sexp-request-handler)
                                     client-starter process-events)
  (make-server)
  (when client-starter
    (funcall client-starter))
  (format *error-output* "Starting server on port ~A~%" *port*)
  (finish-output *error-output*)
  (setf *server-event* (add-multi-threaded-server *event-base* *server* handler))
  (when process-events
    (event-dispatch *event-base*)))

(def*fun start-single-threaded-server (&key (incremental-parser (make-incremental-reader))
                                            (handler (make-incrementally-parsed-request-handler #'handle-sexp-request))
                                            client-starter process-events)
  (make-server)
  (when client-starter
    (funcall client-starter))
  (format *error-output* "Starting server on port ~A~%" *port*)
  (finish-output *error-output*)
  (setf *server-event* (add-single-threaded-server
                        *event-base* *server* incremental-parser handler))
  (when process-events
    (event-dispatch *event-base*)))

(def*fun stop-server ()
  (close *server*)
  (remove-event *event-base* *server-event*)
  (setf *server* nil)
  (setf *server-event* nil))