flup-server

view flup/server/preforkserver.py @ 102:6ea1ffac1bcb

Restore check of the absolute number of children against maxSpare and document rationale.
author Allan Saddi <allan@saddi.com>
date Mon, 17 Aug 2009 12:42:43 -0700
parents e0e7e885f6cc
children 57375deb17c3
line source
1 # Copyright (c) 2005 Allan Saddi <allan@saddi.com>
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions
6 # are met:
7 # 1. Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # 2. Redistributions in binary form must reproduce the above copyright
10 # notice, this list of conditions and the following disclaimer in the
11 # documentation and/or other materials provided with the distribution.
12 #
13 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23 # SUCH DAMAGE.
24 #
25 # $Id$
27 __author__ = 'Allan Saddi <allan@saddi.com>'
28 __version__ = '$Revision$'
30 import sys
31 import os
32 import socket
33 import select
34 import errno
35 import signal
36 import random
37 import time
39 try:
40 import fcntl
41 except ImportError:
42 def setCloseOnExec(sock):
43 pass
44 else:
45 def setCloseOnExec(sock):
46 fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
48 # If running Python < 2.4, require eunuchs module for socket.socketpair().
49 # See <http://www.inoi.fi/open/trac/eunuchs>.
50 if not hasattr(socket, 'socketpair'):
51 try:
52 import eunuchs.socketpair
53 except ImportError:
54 # TODO: Other alternatives? Perhaps using os.pipe()?
55 raise ImportError, 'Requires eunuchs module for Python < 2.4'
57 def socketpair():
58 s1, s2 = eunuchs.socketpair.socketpair()
59 p, c = (socket.fromfd(s1, socket.AF_UNIX, socket.SOCK_STREAM),
60 socket.fromfd(s2, socket.AF_UNIX, socket.SOCK_STREAM))
61 os.close(s1)
62 os.close(s2)
63 return p, c
65 socket.socketpair = socketpair
67 class PreforkServer(object):
68 """
69 A preforked server model conceptually similar to Apache httpd(2). At
70 any given time, ensures there are at least minSpare children ready to
71 process new requests (up to a maximum of maxChildren children total).
72 If the number of idle children is ever above maxSpare, the extra
73 children are killed.
75 If maxRequests is positive, each child will only handle that many
76 requests in its lifetime before exiting.
78 jobClass should be a class whose constructor takes at least two
79 arguments: the client socket and client address. jobArgs, which
80 must be a list or tuple, is any additional (static) arguments you
81 wish to pass to the constructor.
83 jobClass should have a run() method (taking no arguments) that does
84 the actual work. When run() returns, the request is considered
85 complete and the child process moves to idle state.
86 """
87 def __init__(self, minSpare=1, maxSpare=5, maxChildren=50,
88 maxRequests=0, jobClass=None, jobArgs=()):
89 self._minSpare = minSpare
90 self._maxSpare = maxSpare
91 self._maxChildren = max(maxSpare, maxChildren)
92 self._maxRequests = maxRequests
93 self._jobClass = jobClass
94 self._jobArgs = jobArgs
96 # Internal state of children. Maps pids to dictionaries with two
97 # members: 'file' and 'avail'. 'file' is the socket to that
98 # individidual child and 'avail' is whether or not the child is
99 # free to process requests.
100 self._children = {}
102 self._children_to_purge = []
103 self._last_purge = 0
105 if minSpare < 1:
106 raise ValueError("minSpare must be at least 1!")
107 if maxSpare < minSpare:
108 raise ValueError("maxSpare must be greater than, or equal to, minSpare!")
110 def run(self, sock):
111 """
112 The main loop. Pass a socket that is ready to accept() client
113 connections. Return value will be True or False indiciating whether
114 or not the loop was exited due to SIGHUP.
115 """
116 # Set up signal handlers.
117 self._keepGoing = True
118 self._hupReceived = False
119 self._installSignalHandlers()
121 # Don't want operations on main socket to block.
122 sock.setblocking(0)
124 # Set close-on-exec
125 setCloseOnExec(sock)
127 # Main loop.
128 while self._keepGoing:
129 # Maintain minimum number of children. Note that we are checking
130 # the absolute number of children, not the number of "available"
131 # children. We explicitly test against _maxSpare to maintain
132 # an *optimistic* absolute minimum. The number of children will
133 # always be in the range [_maxSpare, _maxChildren].
134 while len(self._children) < self._maxSpare:
135 if not self._spawnChild(sock): break
137 # Wait on any socket activity from live children.
138 r = [x['file'] for x in self._children.values()
139 if x['file'] is not None]
141 if len(r) == len(self._children) and not self._children_to_purge:
142 timeout = None
143 else:
144 # There are dead children that need to be reaped, ensure
145 # that they are by timing out, if necessary. Or there are some
146 # children that need to die.
147 timeout = 2
149 w = (time.time() > self._last_purge + 10) and self._children_to_purge or []
150 try:
151 r, w, e = select.select(r, w, [], timeout)
152 except select.error, e:
153 if e[0] != errno.EINTR:
154 raise
156 # Scan child sockets and tend to those that need attention.
157 for child in r:
158 # Receive status byte.
159 try:
160 state = child.recv(1)
161 except socket.error, e:
162 if e[0] in (errno.EAGAIN, errno.EINTR):
163 # Guess it really didn't need attention?
164 continue
165 raise
166 # Try to match it with a child. (Do we need a reverse map?)
167 for pid,d in self._children.items():
168 if child is d['file']:
169 if state:
170 # Set availability status accordingly.
171 self._children[pid]['avail'] = state != '\x00'
172 else:
173 # Didn't receive anything. Child is most likely
174 # dead.
175 d = self._children[pid]
176 d['file'].close()
177 d['file'] = None
178 d['avail'] = False
180 for child in w:
181 # purging child
182 child.send('bye, bye')
183 del self._children_to_purge[self._children_to_purge.index(child)]
184 self._last_purge = time.time()
186 # Try to match it with a child. (Do we need a reverse map?)
187 for pid,d in self._children.items():
188 if child is d['file']:
189 d['file'].close()
190 d['file'] = None
191 d['avail'] = False
192 break
194 # Reap children.
195 self._reapChildren()
197 # See who and how many children are available.
198 availList = filter(lambda x: x[1]['avail'], self._children.items())
199 avail = len(availList)
201 if avail < self._minSpare:
202 # Need to spawn more children.
203 while avail < self._minSpare and \
204 len(self._children) < self._maxChildren:
205 if not self._spawnChild(sock): break
206 avail += 1
207 elif avail > self._maxSpare:
208 # Too many spares, kill off the extras.
209 pids = [x[0] for x in availList]
210 pids.sort()
211 pids = pids[self._maxSpare:]
212 for pid in pids:
213 d = self._children[pid]
214 d['file'].close()
215 d['file'] = None
216 d['avail'] = False
218 # Clean up all child processes.
219 self._cleanupChildren()
221 # Restore signal handlers.
222 self._restoreSignalHandlers()
224 # Return bool based on whether or not SIGHUP was received.
225 return self._hupReceived
227 def _cleanupChildren(self):
228 """
229 Closes all child sockets (letting those that are available know
230 that it's time to exit). Sends SIGINT to those that are currently
231 processing (and hopes that it finishses ASAP).
233 Any children remaining after 10 seconds is SIGKILLed.
234 """
235 # Let all children know it's time to go.
236 for pid,d in self._children.items():
237 if d['file'] is not None:
238 d['file'].close()
239 d['file'] = None
240 if not d['avail']:
241 # Child is unavailable. SIGINT it.
242 try:
243 os.kill(pid, signal.SIGINT)
244 except OSError, e:
245 if e[0] != errno.ESRCH:
246 raise
248 def alrmHandler(signum, frame):
249 pass
251 # Set up alarm to wake us up after 10 seconds.
252 oldSIGALRM = signal.getsignal(signal.SIGALRM)
253 signal.signal(signal.SIGALRM, alrmHandler)
254 signal.alarm(10)
256 # Wait for all children to die.
257 while len(self._children):
258 try:
259 pid, status = os.wait()
260 except OSError, e:
261 if e[0] in (errno.ECHILD, errno.EINTR):
262 break
263 if self._children.has_key(pid):
264 del self._children[pid]
266 signal.signal(signal.SIGALRM, oldSIGALRM)
268 # Forcefully kill any remaining children.
269 for pid in self._children.keys():
270 try:
271 os.kill(pid, signal.SIGKILL)
272 except OSError, e:
273 if e[0] != errno.ESRCH:
274 raise
276 def _reapChildren(self):
277 """Cleans up self._children whenever children die."""
278 while True:
279 try:
280 pid, status = os.waitpid(-1, os.WNOHANG)
281 except OSError, e:
282 if e[0] == errno.ECHILD:
283 break
284 raise
285 if pid <= 0:
286 break
287 if self._children.has_key(pid): # Sanity check.
288 if self._children[pid]['file'] is not None:
289 self._children[pid]['file'].close()
290 del self._children[pid]
292 def _spawnChild(self, sock):
293 """
294 Spawn a single child. Returns True if successful, False otherwise.
295 """
296 # This socket pair is used for very simple communication between
297 # the parent and its children.
298 parent, child = socket.socketpair()
299 parent.setblocking(0)
300 setCloseOnExec(parent)
301 child.setblocking(0)
302 setCloseOnExec(child)
303 try:
304 pid = os.fork()
305 except OSError, e:
306 if e[0] in (errno.EAGAIN, errno.ENOMEM):
307 return False # Can't fork anymore.
308 raise
309 if not pid:
310 # Child
311 child.close()
312 # Put child into its own process group.
313 pid = os.getpid()
314 os.setpgid(pid, pid)
315 # Restore signal handlers.
316 self._restoreSignalHandlers()
317 # Close copies of child sockets.
318 for f in [x['file'] for x in self._children.values()
319 if x['file'] is not None]:
320 f.close()
321 self._children = {}
322 try:
323 # Enter main loop.
324 self._child(sock, parent)
325 except KeyboardInterrupt:
326 pass
327 sys.exit(0)
328 else:
329 # Parent
330 parent.close()
331 d = self._children[pid] = {}
332 d['file'] = child
333 d['avail'] = True
334 return True
336 def _isClientAllowed(self, addr):
337 """Override to provide access control."""
338 return True
340 def _notifyParent(self, parent, msg):
341 """Send message to parent, ignoring EPIPE and retrying on EAGAIN"""
342 while True:
343 try:
344 parent.send(msg)
345 return True
346 except socket.error, e:
347 if e[0] == errno.EPIPE:
348 return False # Parent is gone
349 if e[0] == errno.EAGAIN:
350 # Wait for socket change before sending again
351 select.select([], [parent], [])
352 else:
353 raise
355 def _child(self, sock, parent):
356 """Main loop for children."""
357 requestCount = 0
359 # Re-seed random module
360 preseed = ''
361 # urandom only exists in Python >= 2.4
362 if hasattr(os, 'urandom'):
363 try:
364 preseed = os.urandom(16)
365 except NotImplementedError:
366 pass
367 # Have doubts about this. random.seed will just hash the string
368 random.seed('%s%s%s' % (preseed, os.getpid(), time.time()))
369 del preseed
371 while True:
372 # Wait for any activity on the main socket or parent socket.
373 r, w, e = select.select([sock, parent], [], [])
375 for f in r:
376 # If there's any activity on the parent socket, it
377 # means the parent wants us to die or has died itself.
378 # Either way, exit.
379 if f is parent:
380 return
382 # Otherwise, there's activity on the main socket...
383 try:
384 clientSock, addr = sock.accept()
385 except socket.error, e:
386 if e[0] == errno.EAGAIN:
387 # Or maybe not.
388 continue
389 raise
391 setCloseOnExec(clientSock)
393 # Check if this client is allowed.
394 if not self._isClientAllowed(addr):
395 clientSock.close()
396 continue
398 # Notify parent we're no longer available.
399 self._notifyParent(parent, '\x00')
401 # Do the job.
402 self._jobClass(clientSock, addr, *self._jobArgs).run()
404 # If we've serviced the maximum number of requests, exit.
405 if self._maxRequests > 0:
406 requestCount += 1
407 if requestCount >= self._maxRequests:
408 break
410 # Tell parent we're free again.
411 if not self._notifyParent(parent, '\xff'):
412 return # Parent is gone.
414 # Signal handlers
416 def _hupHandler(self, signum, frame):
417 self._keepGoing = False
418 self._hupReceived = True
420 def _intHandler(self, signum, frame):
421 self._keepGoing = False
423 def _chldHandler(self, signum, frame):
424 # Do nothing (breaks us out of select and allows us to reap children).
425 pass
427 def _usr1Handler(self, signum, frame):
428 self._children_to_purge = [x['file'] for x in self._children.values()
429 if x['file'] is not None]
431 def _installSignalHandlers(self):
432 supportedSignals = [signal.SIGINT, signal.SIGTERM]
433 if hasattr(signal, 'SIGHUP'):
434 supportedSignals.append(signal.SIGHUP)
435 if hasattr(signal, 'SIGUSR1'):
436 supportedSignals.append(signal.SIGUSR1)
438 self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals]
440 for sig in supportedSignals:
441 if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP:
442 signal.signal(sig, self._hupHandler)
443 elif hasattr(signal, 'SIGUSR1') and sig == signal.SIGUSR1:
444 signal.signal(sig, self._usr1Handler)
445 else:
446 signal.signal(sig, self._intHandler)
448 def _restoreSignalHandlers(self):
449 """Restores previous signal handlers."""
450 for signum,handler in self._oldSIGs:
451 signal.signal(signum, handler)
453 if __name__ == '__main__':
454 class TestJob(object):
455 def __init__(self, sock, addr):
456 self._sock = sock
457 self._addr = addr
458 def run(self):
459 print "Client connection opened from %s:%d" % self._addr
460 self._sock.send('Hello World!\n')
461 self._sock.setblocking(1)
462 self._sock.recv(1)
463 self._sock.close()
464 print "Client connection closed from %s:%d" % self._addr
465 sock = socket.socket()
466 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
467 sock.bind(('', 8080))
468 sock.listen(socket.SOMAXCONN)
469 PreforkServer(maxChildren=10, jobClass=TestJob).run(sock)