"""Unit tests for tftpy.""" import unittest import logging import tftpy import os import time log = tftpy.log class TestTftpyClasses(unittest.TestCase): def setUp(self): tftpy.setLogLevel(logging.DEBUG) def testTftpPacketRRQ(self): log.debug("===> Running testcase testTftpPacketRRQ") options = {} rrq = tftpy.TftpPacketRRQ() rrq.filename = 'myfilename' rrq.mode = 'octet' rrq.options = options rrq.encode() self.assert_(rrq.buffer != None, "Buffer populated") rrq.decode() self.assertEqual(rrq.filename, "myfilename", "Filename correct") self.assertEqual(rrq.mode, "octet", "Mode correct") self.assertEqual(rrq.options, options, "Options correct") # repeat test with options rrq.options = { 'blksize': '1024' } rrq.filename = 'myfilename' rrq.mode = 'octet' rrq.encode() self.assert_(rrq.buffer != None, "Buffer populated") rrq.decode() self.assertEqual(rrq.filename, "myfilename", "Filename correct") self.assertEqual(rrq.mode, "octet", "Mode correct") self.assertEqual(rrq.options['blksize'], '1024', "Blksize correct") def testTftpPacketWRQ(self): log.debug("===> Running test case testTftpPacketWRQ") options = {} wrq = tftpy.TftpPacketWRQ() wrq.filename = 'myfilename' wrq.mode = 'octet' wrq.options = options wrq.encode() self.assert_(wrq.buffer != None, "Buffer populated") wrq.decode() self.assertEqual(wrq.opcode, 2, "Opcode correct") self.assertEqual(wrq.filename, "myfilename", "Filename correct") self.assertEqual(wrq.mode, "octet", "Mode correct") self.assertEqual(wrq.options, options, "Options correct") # repeat test with options wrq.options = { 'blksize': '1024' } wrq.filename = 'myfilename' wrq.mode = 'octet' wrq.encode() self.assert_(wrq.buffer != None, "Buffer populated") wrq.decode() self.assertEqual(wrq.opcode, 2, "Opcode correct") self.assertEqual(wrq.filename, "myfilename", "Filename correct") self.assertEqual(wrq.mode, "octet", "Mode correct") self.assertEqual(wrq.options['blksize'], '1024', "Blksize correct") def testTftpPacketDAT(self): log.debug("===> Running testcase testTftpPacketDAT") dat = tftpy.TftpPacketDAT() dat.blocknumber = 5 data = "this is some data" dat.data = data dat.encode() self.assert_(dat.buffer != None, "Buffer populated") dat.decode() self.assertEqual(dat.opcode, 3, "DAT opcode is correct") self.assertEqual(dat.blocknumber, 5, "Block number is correct") self.assertEqual(dat.data, data, "DAT data is correct") def testTftpPacketACK(self): log.debug("===> Running testcase testTftpPacketACK") ack = tftpy.TftpPacketACK() ack.blocknumber = 6 ack.encode() self.assert_(ack.buffer != None, "Buffer populated") ack.decode() self.assertEqual(ack.opcode, 4, "ACK opcode is correct") self.assertEqual(ack.blocknumber, 6, "ACK blocknumber correct") def testTftpPacketERR(self): log.debug("===> Running testcase testTftpPacketERR") err = tftpy.TftpPacketERR() err.errorcode = 4 err.encode() self.assert_(err.buffer != None, "Buffer populated") err.decode() self.assertEqual(err.opcode, 5, "ERR opcode is correct") self.assertEqual(err.errorcode, 4, "ERR errorcode is correct") def testTftpPacketOACK(self): log.debug("===> Running testcase testTftpPacketOACK") oack = tftpy.TftpPacketOACK() # Test that if we make blksize a number, it comes back a string. oack.options = { 'blksize': 2048 } oack.encode() self.assert_(oack.buffer != None, "Buffer populated") oack.decode() self.assertEqual(oack.opcode, 6, "OACK opcode is correct") self.assertEqual(oack.options['blksize'], '2048', "OACK blksize option is correct") # Test string to string oack.options = { 'blksize': '4096' } oack.encode() self.assert_(oack.buffer != None, "Buffer populated") oack.decode() self.assertEqual(oack.opcode, 6, "OACK opcode is correct") self.assertEqual(oack.options['blksize'], '4096', "OACK blksize option is correct") def testTftpPacketFactory(self): log.debug("===> Running testcase testTftpPacketFactory") # Make sure that the correct class is created for the correct opcode. classes = { 1: tftpy.TftpPacketRRQ, 2: tftpy.TftpPacketWRQ, 3: tftpy.TftpPacketDAT, 4: tftpy.TftpPacketACK, 5: tftpy.TftpPacketERR, 6: tftpy.TftpPacketOACK } factory = tftpy.TftpPacketFactory() for opcode in classes: self.assert_(isinstance(factory._TftpPacketFactory__create(opcode), classes[opcode]), "opcode %d returns the correct class" % opcode) class TestTftpyState(unittest.TestCase): def setUp(self): tftpy.setLogLevel(logging.DEBUG) def clientServerUploadOptions(self, options, transmitname=None): """Fire up a client and a server and do an upload.""" root = '/tmp' home = os.path.dirname(os.path.abspath(__file__)) filename = '100KBFILE' input_path = os.path.join(home, filename) if transmitname: filename = transmitname server = tftpy.TftpServer(root) client = tftpy.TftpClient('localhost', 20001, options) # Fork a server and run the client in this process. child_pid = os.fork() if child_pid: # parent - let the server start try: time.sleep(1) client.upload(filename, input_path) finally: os.kill(child_pid, 15) os.waitpid(child_pid, 0) else: server.listen('localhost', 20001) def clientServerDownloadOptions(self, options): """Fire up a client and a server and do a download.""" root = os.path.dirname(os.path.abspath(__file__)) server = tftpy.TftpServer(root) client = tftpy.TftpClient('localhost', 20001, options) # Fork a server and run the client in this process. child_pid = os.fork() if child_pid: # parent - let the server start try: time.sleep(1) client.download('100KBFILE', '/tmp/out') finally: os.kill(child_pid, 15) os.waitpid(child_pid, 0) else: server.listen('localhost', 20001) def testClientServerNoOptions(self): self.clientServerDownloadOptions({}) def testClientServerBlksize(self): for blksize in [512, 1024, 2048, 4096]: self.clientServerDownloadOptions({'blksize': blksize}) def testClientServerUploadNoOptions(self): self.clientServerUploadOptions({}) def testClientServerUploadWithSubdirs(self): self.clientServerUploadOptions({}, transmitname='foo/bar/100KBFILE') def testClientServerUploadOptions(self): for blksize in [512, 1024, 2048, 4096]: self.clientServerUploadOptions({'blksize': blksize}) def testClientServerNoOptionsDelay(self): tftpy.TftpStates.DELAY_BLOCK = 10 self.clientServerDownloadOptions({}) tftpy.TftpStates.DELAY_BLOCK = 0 def testServerNoOptions(self): raddress = '127.0.0.2' rport = 10000 timeout = 5 root = os.path.dirname(os.path.abspath(__file__)) # Testing without the dyn_func_file set. serverstate = tftpy.TftpContextServer(raddress, rport, timeout, root) self.assertTrue( isinstance(serverstate, tftpy.TftpContextServer) ) rrq = tftpy.TftpPacketRRQ() rrq.filename = '100KBFILE' rrq.mode = 'octet' rrq.options = {} # Start the download. serverstate.start(rrq.encode().buffer) # At a 512 byte blocksize, this should be 200 packets exactly. for block in range(1, 201): # Should be in expectack state. self.assertTrue( isinstance(serverstate.state, tftpy.TftpStateExpectACK) ) ack = tftpy.TftpPacketACK() ack.blocknumber = block serverstate.state = serverstate.state.handle(ack, raddress, rport) # The last DAT packet should be empty, indicating a completed # transfer. ack = tftpy.TftpPacketACK() ack.blocknumber = 201 finalstate = serverstate.state.handle(ack, raddress, rport) self.assertTrue( finalstate is None ) def testServerNoOptionsSubdir(self): raddress = '127.0.0.2' rport = 10000 timeout = 5 root = os.path.dirname(os.path.abspath(__file__)) # Testing without the dyn_func_file set. serverstate = tftpy.TftpContextServer(raddress, rport, timeout, root) self.assertTrue( isinstance(serverstate, tftpy.TftpContextServer) ) rrq = tftpy.TftpPacketRRQ() rrq.filename = 'foo/100KBFILE' rrq.mode = 'octet' rrq.options = {} # Start the download. serverstate.start(rrq.encode().buffer) # At a 512 byte blocksize, this should be 200 packets exactly. for block in range(1, 201): # Should be in expectack state. self.assertTrue( isinstance(serverstate.state, tftpy.TftpStateExpectACK) ) ack = tftpy.TftpPacketACK() ack.blocknumber = block serverstate.state = serverstate.state.handle(ack, raddress, rport) # The last DAT packet should be empty, indicating a completed # transfer. ack = tftpy.TftpPacketACK() ack.blocknumber = 201 finalstate = serverstate.state.handle(ack, raddress, rport) self.assertTrue( finalstate is None ) def testServerInsecurePath(self): raddress = '127.0.0.2' rport = 10000 timeout = 5 root = os.path.dirname(os.path.abspath(__file__)) serverstate = tftpy.TftpContextServer(raddress, rport, timeout, root) rrq = tftpy.TftpPacketRRQ() rrq.filename = '../setup.py' rrq.mode = 'octet' rrq.options = {} # Start the download. self.assertRaises(tftpy.TftpException, serverstate.start, rrq.encode().buffer) def testServerSecurePath(self): raddress = '127.0.0.2' rport = 10000 timeout = 5 root = os.path.dirname(os.path.abspath(__file__)) serverstate = tftpy.TftpContextServer(raddress, rport, timeout, root) rrq = tftpy.TftpPacketRRQ() rrq.filename = '100KBFILE' rrq.mode = 'octet' rrq.options = {} # Start the download. serverstate.start(rrq.encode().buffer) # Should be in expectack state. self.assertTrue(isinstance(serverstate.state, tftpy.TftpStateExpectACK)) if __name__ == '__main__': unittest.main()