Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
;;; -*- Mode: Lisp ; Base: 10 ; Syntax: ANSI-Common-Lisp -*-
;;;;; Simple TCP server
(in-package :philip-jose)
;;;; Generic TCP routines
(defun make-tcp-server-socket (address port)
(let ((socket (make-socket :address-family :internet
:type :stream
:connect :passive
:ipv6 nil)))
(bind-address socket address :port (or port 0))
(socket-listen socket)
socket))
(defun call-connection-handler (socket handler &rest args)
(with-trivial-logging ()
(unwind-protect
(let* ((*standard-input* socket)
(*standard-output* socket))
(apply handler args)
(finish-output))
(close socket))))
;;;; Spawners for a multi-threaded system having one thread per connection.
(defun make-connection-worker (socket handler &rest args)
#'(lambda ()
(apply #'call-connection-handler socket handler args)))
(defun connection-spawner (listener-socket handler)
#'(lambda (fd evtype)
(if (eql evtype :read)
(let ((connection (accept-connection listener-socket)))
(make-thread (make-connection-worker connection handler)) :name "connection worker")
(logger "~&Error on FD ~A" fd))))
(defun add-multi-threaded-server (event-base listener-socket handler)
(iomux::add-fd event-base (sockets::socket-fd listener-socket)
:read (connection-spawner listener-socket handler)
:persistent t))
;;;; Spawners for a single-threaded system
;; Generic code that implements an efficient non-blocking single-threaded server
;; based on an event-loop and incremental message:
;; maintain incrementally parsed request buffers
;; and process requests when complete.
;;
;; The same single server thread can listen to many event sources at once and
;; implement many servers using this (or other compatible) protocols.
;;
;; Reading bytes (or -- ugly hack -- latin1) would avoid ugly SB-IMPL::END-OF-INPUT-IN-CHARACTER
;; when a packet is cut in the middle of a multi-byte character representation.
;; For anything non-trivial, we really want bytes when reading/writing, and a
;; flexible stream protocol.
;;
;; My design choice: I want to minimize the time spent in
;; the single-threaded network message parser/dispatcher
;; so although my source code for the parser will have characters and strings,
;; I will be processing bytes directly the forked/threaded clients can do
;; octets-to-string on whatever the parser gives them.
;;
;; As long as I only use ASCII this will do with BASE-CHAR.
(defun add-connection-handler (event-base connection incremental-parser request-handler
&key (element-type 'base-char)
(external-format :iso-8859-1) ;; :default
(max-buffer-size *max-request-size*))
;;; There is no race condition between setting handle and reading it in close-handler,
;;; because events are not handled asynchronously, but only at the next event-dispatch
;;; so everything we do inside a handler is properly isolated from other handlers
;;; (but not from other threads).
(declare (ignorable element-type external-format max-buffer-size)) ;---***
(let* ((bytes-p (typep 0 element-type))
(initial-element (if bytes-p 0 #\space))
(buffer (make-array (list max-buffer-size)
:element-type element-type
:initial-element initial-element))
(size 0)
(state nil)
handle)
(labels
((close-handler ()
(remove-event event-base handle)
(close connection))
(handler (fd evtype)
(declare (ignorable fd))
(handler-case
(cond
((eql evtype :read)
;;--- cannot read-sequence, because it will block. Have to use read.
(let ((packet-size
(cffi:with-pointer-to-vector-data (buf-ptr buffer)
(et:read fd (cffi:inc-pointer buf-ptr size) (- max-buffer-size size)))))
(incf size packet-size)
(logger "~&Incoming packet from ~A: ~D bytes~%"
(sockets::vector-to-dotted
(sockets::name
(sockets::remote-address connection)))
packet-size)
(multiple-value-bind (x y)
(funcall incremental-parser buffer size state)
(cond
(x
(funcall request-handler connection buffer size y)
(close-handler))
((= size max-buffer-size)
;;---*** log an error?
(close-handler))
(t
(setf state y)))))) ; get ready for next packet
(t
(close-handler)))
(condition (c)
(logger "~&~S~%" c)
(close-handler)))))
(setf handle (iomux::add-fd event-base (sockets::socket-fd connection)
:read #'handler :persistent t)))))
(defun add-single-threaded-server (event-base socket incremental-parser request-handler &rest keys
&key element-type external-format max-buffer-size)
(declare (ignorable element-type external-format max-buffer-size))
(labels
((handler (fd evtype)
(declare (ignorable fd))
(with-trivial-logging ()
(case evtype
(:read
(let ((connection (accept-connection socket)))
(logger "~&Connection opened with ~A~%"
(sockets::vector-to-dotted
(sockets::name (sockets::remote-address connection))))
(apply #'add-connection-handler event-base connection incremental-parser request-handler keys)))
(otherwise
(logger "~&Weird event ~S on ~S~%" evtype fd))))))
(iomux::add-fd event-base (sockets::socket-fd socket)
:read #'handler :persistent t)))
(defun ensure-event-base ()
; can't put this make-instance form in specials.lisp because compiletime != runtime
(if (null *event-base*)
(setf *event-base* (make-instance 'iomux:event-base))))
;; A one central server socket
(defun make-server (&optional (address +ipv4-unspecified+) (port *port*))
(when *server*
(close *server*))
(ensure-event-base)
(setf *server* (make-tcp-server-socket address port))
(setf *port* (sockets::local-port *server*)))
(def*fun start-multi-threaded-server (&key (handler #'sexp-request-handler)
client-starter process-events)
(make-server)
(when client-starter
(funcall client-starter))
(format *error-output* "Starting server on port ~A~%" *port*)
(finish-output *error-output*)
(setf *server-event* (add-multi-threaded-server *event-base* *server* handler))
(when process-events
(event-dispatch *event-base*)))
(def*fun start-single-threaded-server (&key (incremental-parser (make-incremental-reader))
(handler (make-incrementally-parsed-request-handler #'handle-sexp-request))
client-starter process-events)
(make-server)
(when client-starter
(funcall client-starter))
(format *error-output* "Starting server on port ~A~%" *port*)
(finish-output *error-output*)
(setf *server-event* (add-single-threaded-server
*event-base* *server* incremental-parser handler))
(when process-events
(event-dispatch *event-base*)))
(def*fun stop-server ()
(close *server*)
(remove-event *event-base* *server-event*)
(setf *server* nil)
(setf *server-event* nil))