summaryrefslogtreecommitdiffstats
path: root/tester/rt/tftpy/TftpServer.py
diff options
context:
space:
mode:
Diffstat (limited to 'tester/rt/tftpy/TftpServer.py')
-rw-r--r--tester/rt/tftpy/TftpServer.py254
1 files changed, 254 insertions, 0 deletions
diff --git a/tester/rt/tftpy/TftpServer.py b/tester/rt/tftpy/TftpServer.py
new file mode 100644
index 0000000..07c2107
--- /dev/null
+++ b/tester/rt/tftpy/TftpServer.py
@@ -0,0 +1,254 @@
+"""This module implements the TFTP Server functionality. Instantiate an
+instance of the server, and then run the listen() method to listen for client
+requests. Logging is performed via a standard logging object set in
+TftpShared."""
+
+from __future__ import absolute_import, division, print_function, unicode_literals
+import socket, os, time
+import select
+import threading
+from errno import EINTR
+from .TftpShared import *
+from .TftpPacketTypes import *
+from .TftpPacketFactory import TftpPacketFactory
+from .TftpContexts import TftpContextServer
+
+class TftpServer(TftpSession):
+ """This class implements a tftp server object. Run the listen() method to
+ listen for client requests.
+
+ tftproot is the path to the tftproot directory to serve files from and/or
+ write them to.
+
+ dyn_file_func is a callable that takes a requested download
+ path that is not present on the file system and must return either a
+ file-like object to read from or None if the path should appear as not
+ found. This permits the serving of dynamic content.
+
+ upload_open is a callable that is triggered on every upload with the
+ requested destination path and server context. It must either return a
+ file-like object ready for writing or None if the path is invalid."""
+
+ def __init__(self,
+ tftproot='/tftpboot',
+ dyn_file_func=None,
+ upload_open=None):
+ self.listenip = None
+ self.listenport = None
+ self.sock = None
+ # FIXME: What about multiple roots?
+ self.root = os.path.abspath(tftproot)
+ self.dyn_file_func = dyn_file_func
+ self.upload_open = upload_open
+ # A dict of sessions, where each session is keyed by a string like
+ # ip:tid for the remote end.
+ self.sessions = {}
+ # A threading event to help threads synchronize with the server
+ # is_running state.
+ self.is_running = threading.Event()
+
+ self.shutdown_gracefully = False
+ self.shutdown_immediately = False
+
+ for name in 'dyn_file_func', 'upload_open':
+ attr = getattr(self, name)
+ if attr and not callable(attr):
+ raise TftpException, "%s supplied, but it is not callable." % (
+ name,)
+ if os.path.exists(self.root):
+ log.debug("tftproot %s does exist", self.root)
+ if not os.path.isdir(self.root):
+ raise TftpException("The tftproot must be a directory.")
+ else:
+ log.debug("tftproot %s is a directory" % self.root)
+ if os.access(self.root, os.R_OK):
+ log.debug("tftproot %s is readable" % self.root)
+ else:
+ raise TftpException("The tftproot must be readable")
+ if os.access(self.root, os.W_OK):
+ log.debug("tftproot %s is writable" % self.root)
+ else:
+ log.warning("The tftproot %s is not writable" % self.root)
+ else:
+ raise TftpException("The tftproot does not exist.")
+
+ def listen(self, listenip="", listenport=DEF_TFTP_PORT,
+ timeout=SOCK_TIMEOUT):
+ """Start a server listening on the supplied interface and port. This
+ defaults to INADDR_ANY (all interfaces) and UDP port 69. You can also
+ supply a different socket timeout value, if desired."""
+ tftp_factory = TftpPacketFactory()
+
+ # Don't use new 2.5 ternary operator yet
+ # listenip = listenip if listenip else '0.0.0.0'
+ if not listenip: listenip = '0.0.0.0'
+ log.info("Server requested on ip %s, port %s" % (listenip, listenport))
+ try:
+ # FIXME - sockets should be non-blocking
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.sock.bind((listenip, listenport))
+ _, self.listenport = self.sock.getsockname()
+ except socket.error as err:
+ # Reraise it for now.
+ raise err
+
+ self.is_running.set()
+
+ log.info("Starting receive loop...")
+ while True:
+ log.debug("shutdown_immediately is %s" % self.shutdown_immediately)
+ log.debug("shutdown_gracefully is %s" % self.shutdown_gracefully)
+ if self.shutdown_immediately:
+ log.warn("Shutting down now. Session count: %d" %
+ len(self.sessions))
+ self.sock.close()
+ for key in self.sessions:
+ self.sessions[key].end()
+ self.sessions = []
+ break
+
+ elif self.shutdown_gracefully:
+ if not self.sessions:
+ log.warn("In graceful shutdown mode and all "
+ "sessions complete.")
+ self.sock.close()
+ break
+
+ # Build the inputlist array of sockets to select() on.
+ inputlist = []
+ inputlist.append(self.sock)
+ for key in self.sessions:
+ inputlist.append(self.sessions[key].sock)
+
+ # Block until some socket has input on it.
+ log.debug("Performing select on this inputlist: %s", inputlist)
+ try:
+ readyinput, readyoutput, readyspecial = \
+ select.select(inputlist, [], [], timeout)
+ except select.error as err:
+ if err[0] == EINTR:
+ # Interrupted system call
+ log.debug("Interrupted syscall, retrying")
+ continue
+ else:
+ raise
+
+ deletion_list = []
+
+ # Handle the available data, if any. Maybe we timed-out.
+ for readysock in readyinput:
+ # Is the traffic on the main server socket? ie. new session?
+ if readysock == self.sock:
+ log.debug("Data ready on our main socket")
+ buffer, (raddress, rport) = self.sock.recvfrom(MAX_BLKSIZE)
+
+ log.debug("Read %d bytes", len(buffer))
+
+ if self.shutdown_gracefully:
+ log.warn("Discarding data on main port, "
+ "in graceful shutdown mode")
+ continue
+
+ # Forge a session key based on the client's IP and port,
+ # which should safely work through NAT.
+ key = "%s:%s" % (raddress, rport)
+
+ if not key in self.sessions:
+ log.debug("Creating new server context for "
+ "session key = %s" % key)
+ self.sessions[key] = TftpContextServer(raddress,
+ rport,
+ timeout,
+ self.root,
+ self.dyn_file_func,
+ self.upload_open)
+ try:
+ self.sessions[key].start(buffer)
+ except TftpException as err:
+ deletion_list.append(key)
+ log.error("Fatal exception thrown from "
+ "session %s: %s" % (key, str(err)))
+ else:
+ log.warn("received traffic on main socket for "
+ "existing session??")
+ log.info("Currently handling these sessions:")
+ for session_key, session in self.sessions.items():
+ log.info(" %s" % session)
+
+ else:
+ # Must find the owner of this traffic.
+ for key in self.sessions:
+ if readysock == self.sessions[key].sock:
+ log.debug("Matched input to session key %s"
+ % key)
+ try:
+ self.sessions[key].cycle()
+ if self.sessions[key].state == None:
+ log.info("Successful transfer.")
+ deletion_list.append(key)
+ except TftpException as err:
+ deletion_list.append(key)
+ log.error("Fatal exception thrown from "
+ "session %s: %s"
+ % (key, str(err)))
+ # Break out of for loop since we found the correct
+ # session.
+ break
+ else:
+ log.error("Can't find the owner for this packet. "
+ "Discarding.")
+
+ log.debug("Looping on all sessions to check for timeouts")
+ now = time.time()
+ for key in self.sessions:
+ try:
+ self.sessions[key].checkTimeout(now)
+ except TftpTimeout as err:
+ log.error(str(err))
+ self.sessions[key].retry_count += 1
+ if self.sessions[key].retry_count >= TIMEOUT_RETRIES:
+ log.debug("hit max retries on %s, giving up" %
+ self.sessions[key])
+ deletion_list.append(key)
+ else:
+ log.debug("resending on session %s" % self.sessions[key])
+ self.sessions[key].state.resendLast()
+
+ log.debug("Iterating deletion list.")
+ for key in deletion_list:
+ log.info('')
+ log.info("Session %s complete" % key)
+ if key in self.sessions:
+ log.debug("Gathering up metrics from session before deleting")
+ self.sessions[key].end()
+ metrics = self.sessions[key].metrics
+ if metrics.duration == 0:
+ log.info("Duration too short, rate undetermined")
+ else:
+ log.info("Transferred %d bytes in %.2f seconds"
+ % (metrics.bytes, metrics.duration))
+ log.info("Average rate: %.2f kbps" % metrics.kbps)
+ log.info("%.2f bytes in resent data" % metrics.resent_bytes)
+ log.info("%d duplicate packets" % metrics.dupcount)
+ log.debug("Deleting session %s" % key)
+ del self.sessions[key]
+ log.debug("Session list is now %s" % self.sessions)
+ else:
+ log.warn(
+ "Strange, session %s is not on the deletion list" % key)
+
+ self.is_running.clear()
+
+ log.debug("server returning from while loop")
+ self.shutdown_gracefully = self.shutdown_immediately = False
+
+ def stop(self, now=False):
+ """Stop the server gracefully. Do not take any new transfers,
+ but complete the existing ones. If force is True, drop everything
+ and stop. Note, immediately will not interrupt the select loop, it
+ will happen when the server returns on ready data, or a timeout.
+ ie. SOCK_TIMEOUT"""
+ if now:
+ self.shutdown_immediately = True
+ else:
+ self.shutdown_gracefully = True