test: Python3 fixes & improve comments
This commit is contained in:
parent
e008689b9b
commit
1de548e62f
116
test_gcp.py
116
test_gcp.py
|
@ -1,9 +1,9 @@
|
||||||
#!/usr/bin/python
|
#!/usr/bin/env python3
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
gcp: Goffi's CoPier
|
gcp: Goffi's CoPier -- unit tests
|
||||||
Copyright (C) 2010, 2011 Jérôme Poisson <goffi@goffi.org>
|
Copyright (c) 2010, 2011 Jérôme Poisson <goffi@goffi.org>
|
||||||
|
Copyright (c) 2018 Matteo Cypriani <mcy@lm7.fr>
|
||||||
|
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
it under the terms of the GNU General Public License as published by
|
it under the terms of the GNU General Public License as published by
|
||||||
|
@ -19,7 +19,6 @@ You should have received a copy of the GNU General Public License
|
||||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import with_statement
|
|
||||||
import tempfile
|
import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
from os import getcwd, chdir, system, mkdir, makedirs, listdir
|
from os import getcwd, chdir, system, mkdir, makedirs, listdir
|
||||||
|
@ -28,20 +27,25 @@ from shutil import rmtree
|
||||||
from random import randrange
|
from random import randrange
|
||||||
from hashlib import sha1
|
from hashlib import sha1
|
||||||
|
|
||||||
#size shorcuts
|
# gcp command. This assumes gcp is in the PATH. Alternatively, we could use
|
||||||
|
# something like this to use gcp from the current directory:
|
||||||
|
#GCP = join(getcwd(), "gcp")
|
||||||
|
GCP = 'gcp'
|
||||||
|
|
||||||
|
# Size shorcuts
|
||||||
S10K = 1024 * 10
|
S10K = 1024 * 10
|
||||||
S100K = 1024 * 100
|
S100K = 1024 * 100
|
||||||
S1M = 1024 * 1024
|
S1M = 1024 * 1024
|
||||||
S10M = 1024 * 1024 * 10
|
S10M = 1024 * 1024 * 10
|
||||||
S100M = 1024 * 1024 * 100
|
S100M = 1024 * 1024 * 100
|
||||||
|
|
||||||
|
|
||||||
def sha1sum(filename, buf_size=4096):
|
def sha1sum(filename, buf_size=4096):
|
||||||
"""Return the SHA1 hash of a file
|
"""Returns the SHA1 hash of a file.
|
||||||
@param filename: path to the file
|
@param filename: path to the file
|
||||||
@param buf_size: size of the buffer to use for calculation"""
|
@param buf_size: size of the buffer to use for calculation
|
||||||
|
"""
|
||||||
csum = sha1()
|
csum = sha1()
|
||||||
with open(filename) as fd:
|
with open(filename, 'rb') as fd:
|
||||||
data = fd.read(buf_size)
|
data = fd.read(buf_size)
|
||||||
while data:
|
while data:
|
||||||
csum.update(data)
|
csum.update(data)
|
||||||
|
@ -49,7 +53,7 @@ def sha1sum(filename, buf_size=4096):
|
||||||
return csum.digest()
|
return csum.digest()
|
||||||
|
|
||||||
def dirCheck(dir_path):
|
def dirCheck(dir_path):
|
||||||
"""Recursively calculate SHA1 sum of a dir
|
"""Recursively calculates SHA1 sums in a directory.
|
||||||
@param path: path of the dir to check
|
@param path: path of the dir to check
|
||||||
@return: a dict in the form [{filepath: sum,...}]
|
@return: a dict in the form [{filepath: sum,...}]
|
||||||
"""
|
"""
|
||||||
|
@ -69,9 +73,10 @@ def dirCheck(dir_path):
|
||||||
return result
|
return result
|
||||||
|
|
||||||
#def makeRandomFile(path, size, buf_size=4096):
|
#def makeRandomFile(path, size, buf_size=4096):
|
||||||
# """Create a fake file
|
# """Creates a fake file.
|
||||||
# @param path: where the file is created
|
# @param path: where the file is created
|
||||||
# @param size: size of the file to create in bytes"""
|
# @param size: size of the file to create in bytes
|
||||||
|
# """
|
||||||
# def seq(size):
|
# def seq(size):
|
||||||
# return ''.join(chr(randrange(256)) for i in range(size))
|
# return ''.join(chr(randrange(256)) for i in range(size))
|
||||||
# fd = open(path, 'w')
|
# fd = open(path, 'w')
|
||||||
|
@ -81,20 +86,22 @@ def dirCheck(dir_path):
|
||||||
# fd.close()
|
# fd.close()
|
||||||
|
|
||||||
def makeRandomFile(path, size=S10K, buf_size=4096):
|
def makeRandomFile(path, size=S10K, buf_size=4096):
|
||||||
"""Create a fake file using /dev/urandom
|
"""Creates a fake file using /dev/urandom.
|
||||||
@param path: where the file must be created
|
@param path: where the file must be created
|
||||||
@param size: size of the file to create in bytes"""
|
@param size: size of the file to create in bytes
|
||||||
source = open('/dev/urandom','r')
|
"""
|
||||||
dest = open(path, 'w')
|
source = open('/dev/urandom', 'rb')
|
||||||
for byte in range(size//buf_size):
|
dest = open(path, 'wb')
|
||||||
|
for byte in range(size // buf_size):
|
||||||
dest.write(source.read(buf_size))
|
dest.write(source.read(buf_size))
|
||||||
dest.write(source.read(size%buf_size))
|
dest.write(source.read(size % buf_size))
|
||||||
dest.close()
|
dest.close()
|
||||||
|
source.close()
|
||||||
|
|
||||||
def makeTestDir(path):
|
def makeTestDir(path):
|
||||||
"""Helper method to easily create a test dir
|
"""Helper method to easily create a test directory.
|
||||||
@param path: where the dir must be created"""
|
@param path: where the dir must be created
|
||||||
|
"""
|
||||||
for i in range(2):
|
for i in range(2):
|
||||||
subdir = join(path,'subdir_%d' % i)
|
subdir = join(path,'subdir_%d' % i)
|
||||||
makedirs(subdir)
|
makedirs(subdir)
|
||||||
|
@ -104,8 +111,8 @@ def makeTestDir(path):
|
||||||
makeRandomFile(join(path,'file_%d' % i), S10K)
|
makeRandomFile(join(path,'file_%d' % i), S10K)
|
||||||
|
|
||||||
class TestCopyCases(unittest.TestCase):
|
class TestCopyCases(unittest.TestCase):
|
||||||
"""Test basic copy use cases, using gcp externally
|
"""Test basic copy use cases, using gcp externally.
|
||||||
gcp must be available in the PATH"""
|
"""
|
||||||
#TODO: check journal
|
#TODO: check journal
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
@ -118,106 +125,117 @@ class TestCopyCases(unittest.TestCase):
|
||||||
rmtree(self.tmp_dir)
|
rmtree(self.tmp_dir)
|
||||||
|
|
||||||
def test_one_file_copy(self):
|
def test_one_file_copy(self):
|
||||||
"""Copy one file and test the result"""
|
"""Copies one file and tests the result.
|
||||||
|
"""
|
||||||
makeRandomFile('file_1', S10K)
|
makeRandomFile('file_1', S10K)
|
||||||
ori_sum = sha1sum('file_1')
|
ori_sum = sha1sum('file_1')
|
||||||
ret = system("gcp file_1 file_2")
|
ret = system(GCP + " file_1 file_2")
|
||||||
self.assertEqual(ret,0)
|
self.assertEqual(ret,0)
|
||||||
dest_sum = sha1sum('file_2')
|
dest_sum = sha1sum('file_2')
|
||||||
self.assertEqual(ori_sum, dest_sum)
|
self.assertEqual(ori_sum, dest_sum)
|
||||||
|
|
||||||
def test_one_file_copy_already_exists(self):
|
def test_one_file_copy_already_exists(self):
|
||||||
"""Check that an existing file is not overwritten"""
|
"""Checks that an existing file is not overwritten.
|
||||||
|
"""
|
||||||
makeRandomFile('file_1', S10K)
|
makeRandomFile('file_1', S10K)
|
||||||
makeRandomFile('file_2', S10K)
|
makeRandomFile('file_2', S10K)
|
||||||
file_2_sum = sha1sum('file_2')
|
file_2_sum = sha1sum('file_2')
|
||||||
ret = system("gcp file_1 file_2")
|
ret = system(GCP + " file_1 file_2")
|
||||||
self.assertNotEqual(ret,0)
|
self.assertNotEqual(ret,0)
|
||||||
file_2_sum_bis = sha1sum('file_2')
|
file_2_sum_bis = sha1sum('file_2')
|
||||||
self.assertEqual(file_2_sum, file_2_sum_bis)
|
self.assertEqual(file_2_sum, file_2_sum_bis)
|
||||||
|
|
||||||
def test_one_file_copy_already_exists_force(self):
|
def test_one_file_copy_already_exists_force(self):
|
||||||
"""Check that an existing file is overwritten with --force"""
|
"""Checks that an existing file is overwritten with --force.
|
||||||
|
"""
|
||||||
makeRandomFile('file_1', S10K)
|
makeRandomFile('file_1', S10K)
|
||||||
makeRandomFile('file_2', S10K)
|
makeRandomFile('file_2', S10K)
|
||||||
file_1_sum = sha1sum('file_1')
|
file_1_sum = sha1sum('file_1')
|
||||||
ret = system("gcp -f file_1 file_2")
|
ret = system(GCP + " -f file_1 file_2")
|
||||||
self.assertEqual(ret,0)
|
self.assertEqual(ret,0)
|
||||||
file_2_sum_bis = sha1sum('file_2')
|
file_2_sum_bis = sha1sum('file_2')
|
||||||
self.assertEqual(file_1_sum, file_2_sum_bis)
|
self.assertEqual(file_1_sum, file_2_sum_bis)
|
||||||
|
|
||||||
def test_one_dir_copy(self):
|
def test_one_dir_copy(self):
|
||||||
"""Check copy of one dir to a non existant path"""
|
"""Checks copy of a directory to a non-existent path.
|
||||||
|
"""
|
||||||
makeTestDir('dir_1')
|
makeTestDir('dir_1')
|
||||||
check_1 = dirCheck('dir_1')
|
check_1 = dirCheck('dir_1')
|
||||||
ret = system("gcp -r dir_1 dir_2")
|
ret = system(GCP + " -r dir_1 dir_2")
|
||||||
self.assertEqual(ret,0)
|
self.assertEqual(ret,0)
|
||||||
check_2 = dirCheck('dir_2')
|
check_2 = dirCheck('dir_2')
|
||||||
self.assertEqual(check_1, check_2)
|
self.assertEqual(check_1, check_2)
|
||||||
|
|
||||||
def test_one_dir_copy_nocopy(self):
|
def test_one_dir_copy_nocopy(self):
|
||||||
"""Check that a dir is not copied without the recursive option"""
|
"""Checks that a directory is not copied without the recursive option.
|
||||||
|
"""
|
||||||
makeTestDir('dir_1')
|
makeTestDir('dir_1')
|
||||||
check_before = dirCheck('.')
|
check_before = dirCheck('.')
|
||||||
ret = system("gcp dir_1 dir_2")
|
ret = system(GCP + " dir_1 dir_2")
|
||||||
self.assertEqual(ret,0)
|
self.assertEqual(ret,0)
|
||||||
check_after = dirCheck('.')
|
check_after = dirCheck('.')
|
||||||
self.assertEqual(check_before, check_after)
|
self.assertEqual(check_before, check_after)
|
||||||
|
|
||||||
def test_one_dir_copy_existing_dest(self):
|
def test_one_dir_copy_existing_dest(self):
|
||||||
"""Check that a dir is copied inside an existing destination"""
|
"""Checks that a directory is copied inside an existing destination.
|
||||||
|
"""
|
||||||
makeTestDir('dir_1')
|
makeTestDir('dir_1')
|
||||||
mkdir('dir_2')
|
mkdir('dir_2')
|
||||||
check_1 = dirCheck('dir_1')
|
check_1 = dirCheck('dir_1')
|
||||||
ret = system("gcp -r dir_1 dir_2")
|
ret = system(GCP + " -r dir_1 dir_2")
|
||||||
self.assertEqual(ret,0)
|
self.assertEqual(ret,0)
|
||||||
self.assertEqual(listdir('dir_2'), ['dir_1'])
|
self.assertEqual(listdir('dir_2'), ['dir_1'])
|
||||||
check_2 = dirCheck('dir_2/dir_1')
|
check_2 = dirCheck('dir_2/dir_1')
|
||||||
self.assertEqual(check_1, check_2)
|
self.assertEqual(check_1, check_2)
|
||||||
|
|
||||||
def test_mixt_copy_existing_dest(self):
|
def test_mixed_copy_existing_dest(self):
|
||||||
"""Check that a mixt copy (files + dir) to an existing dest work as expected"""
|
"""Mixed copy (files + dir) to an existing destination.
|
||||||
|
"""
|
||||||
for i in range(2):
|
for i in range(2):
|
||||||
makeRandomFile('file_%d' % i, S10K)
|
makeRandomFile('file_%d' % i, S10K)
|
||||||
makeTestDir('dir_%d' % i)
|
makeTestDir('dir_%d' % i)
|
||||||
check_1 = dirCheck('.')
|
check_1 = dirCheck('.')
|
||||||
mkdir('dest_dir')
|
mkdir('dest_dir')
|
||||||
ret = system("gcp -r file_0 file_1 dir_0 dir_1 dest_dir")
|
ret = system(GCP + " -r file_0 file_1 dir_0 dir_1 dest_dir")
|
||||||
self.assertEqual(ret,0)
|
self.assertEqual(ret,0)
|
||||||
check_2 = dirCheck('dest_dir')
|
check_2 = dirCheck('dest_dir')
|
||||||
self.assertEqual(check_1, check_2)
|
self.assertEqual(check_1, check_2)
|
||||||
|
|
||||||
def test_mixt_copy_nonexisting_dest(self):
|
def test_mixed_copy_nonexisting_dest(self):
|
||||||
"""Check that a mixt copy (files + dir) to an non existing dest work as expected
|
"""Mixed copy (files + dir) to an inexistant destination.
|
||||||
/!\\ the behavious is different of the one of cp in this case ! (cp doesn't copy at all, while gcp create the dest)"""
|
/!\\ The behaviour is different than cp's! (cp doesn't copy at all,
|
||||||
|
while gcp create the destintation directory.)
|
||||||
|
"""
|
||||||
for i in range(2):
|
for i in range(2):
|
||||||
makeRandomFile('file_%d' % i, S10K)
|
makeRandomFile('file_%d' % i, S10K)
|
||||||
makeTestDir('dir_%d' % i)
|
makeTestDir('dir_%d' % i)
|
||||||
check_1 = dirCheck('.')
|
check_1 = dirCheck('.')
|
||||||
ret = system("gcp -r file_0 file_1 dir_0 dir_1 dest_dir")
|
ret = system(GCP + " -r file_0 file_1 dir_0 dir_1 dest_dir")
|
||||||
self.assertEqual(ret,0)
|
self.assertEqual(ret,0)
|
||||||
check_2 = dirCheck('dest_dir')
|
check_2 = dirCheck('dest_dir')
|
||||||
self.assertEqual(check_1, check_2)
|
self.assertEqual(check_1, check_2)
|
||||||
|
|
||||||
def test_mixt_copy_existing_dest_nonrecursive(self):
|
def test_mixed_copy_existing_dest_nonrecursive(self):
|
||||||
"""Check that a mixt copy (files + dir) to an existing dest without the recursive option work as expected"""
|
"""Non-recursive mixed copy (files + dir) to an existing destination.
|
||||||
|
"""
|
||||||
for i in range(2):
|
for i in range(2):
|
||||||
makeRandomFile('file_%d' % i, S10K)
|
makeRandomFile('file_%d' % i, S10K)
|
||||||
makeTestDir('dir_%d' % i)
|
makeTestDir('dir_%d' % i)
|
||||||
mkdir('dest_dir')
|
mkdir('dest_dir')
|
||||||
ret = system("gcp file_0 file_1 dir_0 dir_1 dest_dir")
|
ret = system(GCP + " file_0 file_1 dir_0 dir_1 dest_dir")
|
||||||
self.assertEqual(ret,0)
|
self.assertEqual(ret,0)
|
||||||
self.assertEqual(set(listdir('dest_dir')), set(['file_0', 'file_1']))
|
self.assertEqual(set(listdir('dest_dir')), set(['file_0', 'file_1']))
|
||||||
self.assertEqual(sha1sum('file_0'), sha1sum('dest_dir/file_0'))
|
self.assertEqual(sha1sum('file_0'), sha1sum('dest_dir/file_0'))
|
||||||
self.assertEqual(sha1sum('file_1'), sha1sum('dest_dir/file_1'))
|
self.assertEqual(sha1sum('file_1'), sha1sum('dest_dir/file_1'))
|
||||||
|
|
||||||
def test_mixt_copy_nonexisting_dest_nonrecursive(self):
|
def test_mixed_copy_nonexisting_dest_nonrecursive(self):
|
||||||
"""Check that a mixt copy (files + dir) to an existing dest without the recursive option work as expected"""
|
"""Non-recursive mixed copy (files + dir) to an inexistant destination.
|
||||||
|
"""
|
||||||
for i in range(2):
|
for i in range(2):
|
||||||
makeRandomFile('file_%d' % i, S10K)
|
makeRandomFile('file_%d' % i, S10K)
|
||||||
makeTestDir('dir_%d' % i)
|
makeTestDir('dir_%d' % i)
|
||||||
check_before = dirCheck('.')
|
check_before = dirCheck('.')
|
||||||
ret = system("gcp file_0 file_1 dir_0 dir_1 dest_dir")
|
ret = system(GCP + " file_0 file_1 dir_0 dir_1 dest_dir")
|
||||||
self.assertEqual(ret >> 8, 1)
|
self.assertEqual(ret >> 8, 1)
|
||||||
check_after = dirCheck('.')
|
check_after = dirCheck('.')
|
||||||
self.assertEqual(check_before, check_after)
|
self.assertEqual(check_before, check_after)
|
||||||
|
|
Loading…
Reference in New Issue