Do not assume /dev/loop0 and /dev/loop1 are available.
I do a lot of live image making, which results in those loopback devices
sometimes not being available. Luckily, losetup will tell us the name of
the next free device.
Also fix up some imports so they're in the right place, while I'm digging
around in these files.
---
tests/storage_test/devicelibs_test/baseclass.py | 41 +++++++++++++++----
tests/storage_test/devicelibs_test/crypto_test.py | 46 ++++++++++----------
tests/storage_test/devicelibs_test/lvm_test.py | 31 +++++++-------
tests/storage_test/devicelibs_test/mdraid_test.py | 19 +++++----
tests/storage_test/devicelibs_test/mpath_test.py | 4 +-
tests/storage_test/devicelibs_test/swap_test.py | 31 +++++++-------
6 files changed, 99 insertions(+), 73 deletions(-)
mode change 100644 => 100755 tests/storage_test/devicelibs_test/crypto_test.py
mode change 100644 => 100755 tests/storage_test/devicelibs_test/lvm_test.py
mode change 100644 => 100755 tests/storage_test/devicelibs_test/mdraid_test.py
mode change 100644 => 100755 tests/storage_test/devicelibs_test/mpath_test.py
mode change 100644 => 100755 tests/storage_test/devicelibs_test/swap_test.py
diff --git a/tests/storage_test/devicelibs_test/baseclass.py b/tests/storage_test/devicelibs_test/baseclass.py
index c19bfc3..65c74e8 100644
--- a/tests/storage_test/devicelibs_test/baseclass.py
+++ b/tests/storage_test/devicelibs_test/baseclass.py
@@ -8,7 +8,7 @@ def makeLoopDev(device_name, file_name):
"bs=1024", "count=102400"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while True:
- proc.communicate()
+ (out, err) = proc.communicate()
if proc.returncode is not None:
rc = proc.returncode
break
@@ -18,7 +18,7 @@ def makeLoopDev(device_name, file_name):
proc = subprocess.Popen(["losetup", device_name, file_name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while True:
- proc.communicate()
+ (out, err) = proc.communicate()
if proc.returncode is not None:
rc = proc.returncode
break
@@ -29,7 +29,7 @@ def removeLoopDev(device_name, file_name):
proc = subprocess.Popen(["losetup", "-d", device_name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while True:
- proc.communicate()
+ (out, err) = proc.communicate()
if proc.returncode is not None:
rc = proc.returncode
break
@@ -38,18 +38,43 @@ def removeLoopDev(device_name, file_name):
os.unlink(file_name)
+def getFreeLoopDev():
+ # There's a race condition here where another process could grab the loop
+ # device losetup gives us before we have time to set it up, but that's just
+ # a chance we'll have to take.
+ proc = subprocess.Popen(["losetup", "-f"],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out = None
+
+ while True:
+ (out, err) = proc.communicate()
+ if proc.returncode is not None:
+ rc = proc.returncode
+ out = out.strip()
+ break
+
+ if rc:
+ raise OSError, "losetup failed to find a free device"
+
+ return out
class DevicelibsTestCase(unittest.TestCase):
- _LOOP_DEVICES = (("/dev/loop0", "/tmp/test-virtdev0"),
- ("/dev/loop1", "/tmp/test-virtdev1"))
+ _LOOP_DEVICES = ["/tmp/test-virtdev0", "/tmp/test-virtdev1"]
+
+ def __init__(self, *args, **kwargs):
+ import anaconda_log
+ anaconda_log.init()
- ((_LOOP_DEV0, _LOOP_FILE0), (_LOOP_DEV1, _LOOP_FILE1)) = _LOOP_DEVICES
+ unittest.TestCase.__init__(self, *args, **kwargs)
+ self._loopMap = {}
def setUp(self):
- for dev, file in self._LOOP_DEVICES:
+ for file in self._LOOP_DEVICES:
+ dev = getFreeLoopDev()
makeLoopDev(dev, file)
+ self._loopMap[file] = dev
def tearDown(self):
- for dev, file in self._LOOP_DEVICES:
+ for (file, dev) in self._loopMap.iteritems():
removeLoopDev(dev, file)
diff --git a/tests/storage_test/devicelibs_test/crypto_test.py b/tests/storage_test/devicelibs_test/crypto_test.py
old mode 100644
new mode 100755
index 434fdcb..dd05af2
--- a/tests/storage_test/devicelibs_test/crypto_test.py
+++ b/tests/storage_test/devicelibs_test/crypto_test.py
@@ -7,24 +7,24 @@ import os
class CryptoTestCase(baseclass.DevicelibsTestCase):
- def setUp(self):
- baseclass.DevicelibsTestCase.setUp(self)
- import storage.devicelibs.crypto as crypto
+ def testCrypto(self):
+ _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]]
+ _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]]
+ import storage.devicelibs.crypto as crypto
- def testCrypto(self):
##
## is_luks
##
# pass
- self.assertEqual(crypto.is_luks(self._LOOP_DEV0), -22)
+ self.assertEqual(crypto.is_luks(_LOOP_DEV0), -22)
self.assertEqual(crypto.is_luks("/not/existing/device"), -22)
##
## luks_format
##
# pass
- self.assertEqual(crypto.luks_format(self._LOOP_DEV 0, passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256), None)
+ self.assertEqual(crypto.luks_format(_LOOP_DEV0, passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256), None)
# make a key file
handle, keyfile = tempfile.mkstemp(prefix="key", text=False)
@@ -32,25 +32,25 @@ class CryptoTestCase(baseclass.DevicelibsTestCase):
os.close(handle)
# format with key file
- self.assertEqual(crypto.luks_format(self._LOOP_DEV 1, key_file=keyfile), None)
+ self.assertEqual(crypto.luks_format(_LOOP_DEV1, key_file=keyfile), None)
# fail
self.assertRaises(crypto.CryptoError, crypto.luks_format, "/not/existing/device", passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256)
# no passhprase or key file
- self.assertRaises(ValueError, crypto.luks_format, self._LOOP_DEV1, cipher="aes-cbc-essiv:sha256", key_size=256)
+ self.assertRaises(ValueError, crypto.luks_format, _LOOP_DEV1, cipher="aes-cbc-essiv:sha256", key_size=256)
##
## is_luks
##
# pass
- self.assertEqual(crypto.is_luks(self._LOOP_DEV0), 0) # 0 = is luks
- self.assertEqual(crypto.is_luks(self._LOOP_DEV1), 0)
+ self.assertEqual(crypto.is_luks(_LOOP_DEV0), 0) # 0 = is luks
+ self.assertEqual(crypto.is_luks(_LOOP_DEV1), 0)
##
## luks_add_key
##
# pass
- self.assertEqual(crypto.luks_add_key(self._LOOP_DE V0, new_passphrase="another-secret", passphrase="secret"), None)
+ self.assertEqual(crypto.luks_add_key(_LOOP_DEV0, new_passphrase="another-secret", passphrase="secret"), None)
# make another key file
handle, new_keyfile = tempfile.mkstemp(prefix="key", text=False)
@@ -58,35 +58,35 @@ class CryptoTestCase(baseclass.DevicelibsTestCase):
os.close(handle)
# add new key file
- self.assertEqual(crypto.luks_add_key(self._LOOP_DE V1, new_key_file=new_keyfile, key_file=keyfile), None)
+ self.assertEqual(crypto.luks_add_key(_LOOP_DEV1, new_key_file=new_keyfile, key_file=keyfile), None)
# fail
- self.assertRaises(RuntimeError, crypto.luks_add_key, self._LOOP_DEV0, new_passphrase="another-secret", passphrase="wrong-passphrase")
+ self.assertRaises(crypto.CryptoError, crypto.luks_add_key, _LOOP_DEV0, new_passphrase="another-secret", passphrase="wrong-passphrase")
##
## luks_remove_key
##
# fail
- self.assertRaises(RuntimeError, crypto.luks_remove_key, self._LOOP_DEV0, del_passphrase="another-secret", passphrase="wrong-pasphrase")
+ self.assertRaises(RuntimeError, crypto.luks_remove_key, _LOOP_DEV0, del_passphrase="another-secret", passphrase="wrong-pasphrase")
# pass
- self.assertEqual(crypto.luks_remove_key(self._LOOP _DEV0, del_passphrase="another-secret", passphrase="secret"), None)
+ self.assertEqual(crypto.luks_remove_key(_LOOP_DEV0 , del_passphrase="another-secret", passphrase="secret"), None)
# remove key file
- self.assertEqual(crypto.luks_remove_key(self._LOOP _DEV1, del_key_file=new_keyfile, key_file=keyfile), None)
+ self.assertEqual(crypto.luks_remove_key(LOOP_DEV1, del_key_file=new_keyfile, key_file=keyfile), None)
##
## luks_open
##
# pass
- self.assertEqual(crypto.luks_open(self._LOOP_DEV0, "crypted", passphrase="secret"), None)
- self.assertEqual(crypto.luks_open(self._LOOP_DEV1, "encrypted", key_file=keyfile), None)
+ self.assertEqual(crypto.luks_open(_LOOP_DEV0, "crypted", passphrase="secret"), None)
+ self.assertEqual(crypto.luks_open(_LOOP_DEV1, "encrypted", key_file=keyfile), None)
# fail
self.assertRaises(crypto.CryptoError, crypto.luks_open, "/not/existing/device", "another-crypted", passphrase="secret")
self.assertRaises(crypto.CryptoError, crypto.luks_open, "/not/existing/device", "another-crypted", key_file=keyfile)
# no passhprase or key file
- self.assertRaises(ValueError, crypto.luks_open, self._LOOP_DEV1, "another-crypted")
+ self.assertRaises(ValueError, crypto.luks_open, _LOOP_DEV1, "another-crypted")
##
## luks_status
@@ -100,10 +100,10 @@ class CryptoTestCase(baseclass.DevicelibsTestCase):
## luks_uuid
##
# pass
- uuid = crypto.luks_uuid(self._LOOP_DEV0)
- self.assertEqual(crypto.luks_uuid(self._LOOP_DEV0) , uuid)
- uuid = crypto.luks_uuid(self._LOOP_DEV1)
- self.assertEqual(crypto.luks_uuid(self._LOOP_DEV1) , uuid)
+ uuid = crypto.luks_uuid(_LOOP_DEV0)
+ self.assertEqual(crypto.luks_uuid(_LOOP_DEV0), uuid)
+ uuid = crypto.luks_uuid(_LOOP_DEV1)
+ self.assertEqual(crypto.luks_uuid(_LOOP_DEV1), uuid)
##
## luks_close
diff --git a/tests/storage_test/devicelibs_test/lvm_test.py b/tests/storage_test/devicelibs_test/lvm_test.py
old mode 100644
new mode 100755
index 734644d..e81e529
--- a/tests/storage_test/devicelibs_test/lvm_test.py
+++ b/tests/storage_test/devicelibs_test/lvm_test.py
@@ -4,16 +4,17 @@ import unittest
class LVMTestCase(baseclass.DevicelibsTestCase):
- def setUp(self):
- baseclass.DevicelibsTestCase.setUp(self)
+ def testLVM(self):
+ _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]]
+ _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]]
+
import storage.devicelibs.lvm as lvm
- def testLVM(self):
##
## pvcreate
##
# pass
- for dev, file in self._LOOP_DEVICES:
+ for dev, file in self._loopMap.iteritems():
self.assertEqual(lvm.pvcreate(dev), None)
# fail
@@ -23,7 +24,7 @@ class LVMTestCase(baseclass.DevicelibsTestCase):
## pvresize
##
# pass
- for dev, file in self._LOOP_DEVICES:
+ for dev, file in self._loopMap.iteritems():
self.assertEqual(lvm.pvresize(dev, 50), None)
self.assertEqual(lvm.pvresize(dev, 100), None)
@@ -34,21 +35,21 @@ class LVMTestCase(baseclass.DevicelibsTestCase):
## vgcreate
##
# pass
- self.assertEqual(lvm.vgcreate("test-vg", [self._LOOP_DEV0, self._LOOP_DEV1], 4), None)
+ self.assertEqual(lvm.vgcreate("test-vg", [_LOOP_DEV0, _LOOP_DEV1], 4), None)
# fail
self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", ["/not/existing/device"], 4)
# vg already exists
- self.assertRaises(lvm.LVMError, lvm.vgcreate, "test-vg", [self._LOOP_DEV0], 4)
+ self.assertRaises(lvm.LVMError, lvm.vgcreate, "test-vg", [_LOOP_DEV0], 4)
# pe size must be power of 2
- self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", [self._LOOP_DEV0], 5)
+ self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", [_LOOP_DEV0], 5)
##
## pvremove
##
# fail
# cannot remove pv now with vg created
- self.assertRaises(lvm.LVMError, lvm.pvremove, self._LOOP_DEV0)
+ self.assertRaises(lvm.LVMError, lvm.pvremove, _LOOP_DEV0)
##
## vgdeactivate
@@ -63,10 +64,10 @@ class LVMTestCase(baseclass.DevicelibsTestCase):
## vgreduce
##
# pass
- self.assertEqual(lvm.vgreduce("test-vg", [self._LOOP_DEV1]), None)
+ self.assertEqual(lvm.vgreduce("test-vg", [_LOOP_DEV1]), None)
# fail
- self.assertRaises(lvm.LVMError, lvm.vgreduce, "wrong-vg-name", [self._LOOP_DEV1])
+ self.assertRaises(lvm.LVMError, lvm.vgreduce, "wrong-vg-name", [_LOOP_DEV1])
self.assertRaises(lvm.LVMError, lvm.vgreduce, "test-vg", ["/not/existing/device"])
##
@@ -82,9 +83,9 @@ class LVMTestCase(baseclass.DevicelibsTestCase):
## pvinfo
##
# pass
- self.assertEqual(lvm.pvinfo(self._LOOP_DEV0)["pv_name"], self._LOOP_DEV0)
+ self.assertEqual(lvm.pvinfo(_LOOP_DEV0)["pv_name"], _LOOP_DEV0)
# no vg
- self.assertEqual(lvm.pvinfo(self._LOOP_DEV1)["pv_name"], self._LOOP_DEV1)
+ self.assertEqual(lvm.pvinfo(_LOOP_DEV1)["pv_name"], _LOOP_DEV1)
# fail
self.assertRaises(lvm.LVMError, lvm.pvinfo, "/not/existing/device")
@@ -189,13 +190,13 @@ class LVMTestCase(baseclass.DevicelibsTestCase):
## pvremove
##
# pass
- for dev, file in self._LOOP_DEVICES:
+ for dev, file in self._loopMap.iteritems():
self.assertEqual(lvm.pvremove(dev), None)
# fail
self.assertRaises(lvm.LVMError, lvm.pvremove, "/not/existing/device")
# pv already removed
- self.assertRaises(lvm.LVMError, lvm.pvremove, self._LOOP_DEV0)
+ self.assertRaises(lvm.LVMError, lvm.pvremove, _LOOP_DEV0)
#def testGetPossiblePhysicalExtents(self):
# pass
diff --git a/tests/storage_test/devicelibs_test/mdraid_test.py b/tests/storage_test/devicelibs_test/mdraid_test.py
old mode 100644
new mode 100755
index 9dde22d..1f7849e
--- a/tests/storage_test/devicelibs_test/mdraid_test.py
+++ b/tests/storage_test/devicelibs_test/mdraid_test.py
@@ -5,11 +5,12 @@ import time
class MDRaidTestCase(baseclass.DevicelibsTestCase):
- def setUp(self):
- baseclass.DevicelibsTestCase.setUp(self)
+ def testMDRaid(self):
+ _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]]
+ _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]]
+
import storage.devicelibs.mdraid as mdraid
- def testMDRaid(self):
##
## getRaidLevels
##
@@ -28,7 +29,7 @@ class MDRaidTestCase(baseclass.DevicelibsTestCase):
# fail
# unsupported raid
- self.assertRaises(ValueError, mdraid.get_raid_min_members, 4)
+ self.assertRaises(ValueError, mdraid.get_raid_min_members, 8)
##
## get_raid_max_spares
@@ -42,13 +43,13 @@ class MDRaidTestCase(baseclass.DevicelibsTestCase):
# fail
# unsupported raid
- self.assertRaises(ValueError, mdraid.get_raid_max_spares, 4, 5)
+ self.assertRaises(ValueError, mdraid.get_raid_max_spares, 8, 5)
##
## mdcreate
##
# pass
- self.assertEqual(mdraid.mdcreate("/dev/md0", 1, [self._LOOP_DEV0, self._LOOP_DEV1]), None)
+ self.assertEqual(mdraid.mdcreate("/dev/md0", 1, [_LOOP_DEV0, _LOOP_DEV1]), None)
# wait for raid to settle
time.sleep(2)
@@ -77,7 +78,7 @@ class MDRaidTestCase(baseclass.DevicelibsTestCase):
## mdactivate
##
# pass
- self.assertEqual(mdraid.mdactivate("/dev/md0", [self._LOOP_DEV0, self._LOOP_DEV1], super_minor=0), None)
+ self.assertEqual(mdraid.mdactivate("/dev/md0", [_LOOP_DEV0, _LOOP_DEV1], super_minor=0), None)
# wait for raid to settle
time.sleep(2)
@@ -93,8 +94,8 @@ class MDRaidTestCase(baseclass.DevicelibsTestCase):
# deactivate first
self.assertEqual(mdraid.mddeactivate("/dev/md0"), None)
- self.assertEqual(mdraid.mddestroy(self._LOOP_DEV0) , None)
- self.assertEqual(mdraid.mddestroy(self._LOOP_DEV1) , None)
+ self.assertEqual(mdraid.mddestroy(_LOOP_DEV0), None)
+ self.assertEqual(mdraid.mddestroy(_LOOP_DEV1), None)
# fail
# not a component
diff --git a/tests/storage_test/devicelibs_test/mpath_test.py b/tests/storage_test/devicelibs_test/mpath_test.py
old mode 100644
new mode 100755
index 813a516..377b50d
--- a/tests/storage_test/devicelibs_test/mpath_test.py
+++ b/tests/storage_test/devicelibs_test/mpath_test.py
@@ -3,11 +3,9 @@ import baseclass
import unittest
class MPathTestCase(baseclass.DevicelibsTestCase):
- def setUp(self):
- baseclass.DevicelibsTestCase.setUp(self)
+ def testMPath(self):
import storage.devicelibs.mpath as mpath
- def testMPath(self):
##
## parseMultipathOutput
##
diff --git a/tests/storage_test/devicelibs_test/swap_test.py b/tests/storage_test/devicelibs_test/swap_test.py
old mode 100644
new mode 100755
index ddf8d21..68f778d
--- a/tests/storage_test/devicelibs_test/swap_test.py
+++ b/tests/storage_test/devicelibs_test/swap_test.py
@@ -4,16 +4,17 @@ import unittest
class SwapTestCase(baseclass.DevicelibsTestCase):
- def setUp(self):
- baseclass.DevicelibsTestCase.setUp(self)
+ def testSwap(self):
+ _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]]
+ _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]]
+
import storage.devicelibs.swap as swap
- def testSwap(self):
##
## mkswap
##
# pass
- self.assertEqual(swap.mkswap(self._LOOP_DEV0, "swap"), None)
+ self.assertEqual(swap.mkswap(_LOOP_DEV0, "swap"), None)
# fail
self.assertRaises(swap.SwapError, swap.mkswap, "/not/existing/device")
@@ -22,24 +23,24 @@ class SwapTestCase(baseclass.DevicelibsTestCase):
## swapon
##
# pass
- self.assertEqual(swap.swapon(self._LOOP_DEV0, 1), None)
+ self.assertEqual(swap.swapon(_LOOP_DEV0, 1), None)
# fail
self.assertRaises(swap.SwapError, swap.swapon, "/not/existing/device")
# not a swap partition
- self.assertRaises(swap.SwapError, swap.swapon, self._LOOP_DEV1)
+ self.assertRaises(swap.SwapError, swap.swapon, _LOOP_DEV1)
# pass
# make another swap
- self.assertEqual(swap.mkswap(self._LOOP_DEV1, "another-swap"), None)
- self.assertEqual(swap.swapon(self._LOOP_DEV1), None)
+ self.assertEqual(swap.mkswap(_LOOP_DEV1, "another-swap"), None)
+ self.assertEqual(swap.swapon(_LOOP_DEV1), None)
##
## swapstatus
##
# pass
- self.assertEqual(swap.swapstatus(self._LOOP_DEV0), True)
- self.assertEqual(swap.swapstatus(self._LOOP_DEV1), True)
+ self.assertEqual(swap.swapstatus(_LOOP_DEV0), True)
+ self.assertEqual(swap.swapstatus(_LOOP_DEV1), True)
# does not fail
self.assertEqual(swap.swapstatus("/not/existing/device"), False)
@@ -48,18 +49,18 @@ class SwapTestCase(baseclass.DevicelibsTestCase):
## swapoff
##
# pass
- self.assertEqual(swap.swapoff(self._LOOP_DEV1), None)
+ self.assertEqual(swap.swapoff(_LOOP_DEV1), None)
# check status
- self.assertEqual(swap.swapstatus(self._LOOP_DEV0), True)
- self.assertEqual(swap.swapstatus(self._LOOP_DEV1), False)
+ self.assertEqual(swap.swapstatus(_LOOP_DEV0), True)
+ self.assertEqual(swap.swapstatus(_LOOP_DEV1), False)
- self.assertEqual(swap.swapoff(self._LOOP_DEV0), None)
+ self.assertEqual(swap.swapoff(_LOOP_DEV0), None)
# fail
self.assertRaises(swap.SwapError, swap.swapoff, "/not/existing/device")
# already off
- self.assertRaises(swap.SwapError, swap.swapoff, self._LOOP_DEV0)
+ self.assertRaises(swap.SwapError, swap.swapoff, _LOOP_DEV0)
def suite():
--
1.7.0.1
_______________________________________________
Anaconda-devel-list mailing list
Anaconda-devel-list@redhat.com
https://www.redhat.com/mailman/listinfo/anaconda-devel-list