Codebase list netcdf4-python / fresh-snapshots/main test / tst_compression.py
fresh-snapshots/main

Tree @fresh-snapshots/main (Download .tar.gz)

tst_compression.py @fresh-snapshots/mainraw · history · blame

from numpy.random.mtrand import uniform
from netCDF4 import Dataset
from netCDF4.utils import _quantize
from numpy.testing import assert_almost_equal
import os, tempfile, unittest

ndim = 100000
ndim2 = 100
chunk1 = 10; chunk2 = ndim2
nfiles = 7
files = [tempfile.NamedTemporaryFile(suffix='.nc', delete=False).name for nfile in range(nfiles)]
array = uniform(size=(ndim,))
array2 = uniform(size=(ndim,ndim2))
lsd = 3

def write_netcdf(filename,zlib,least_significant_digit,data,dtype='f8',shuffle=False,contiguous=False,\
                 chunksizes=None,complevel=6,fletcher32=False):
    file = Dataset(filename,'w')
    file.createDimension('n', ndim)
    foo = file.createVariable('data',\
            dtype,('n'),zlib=zlib,least_significant_digit=least_significant_digit,\
            shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
    # use compression kwarg instead of deprecated zlib
    if zlib:
        compression='zlib'
    else:
        compression=None
        # anything that evaluates to False is same as None
        #compression=False
        #compression=''
        #compression=0
        #compression='gzip' # should fail
    foo2 = file.createVariable('data2',\
            dtype,('n'),compression=compression,least_significant_digit=least_significant_digit,\
            shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
    foo[:] = data
    foo2[:] = data
    file.close()
    file = Dataset(filename)
    data = file.variables['data'][:]
    data2 = file.variables['data2'][:]
    file.close()

def write_netcdf2(filename,zlib,least_significant_digit,data,dtype='f8',shuffle=False,contiguous=False,\
                 chunksizes=None,complevel=6,fletcher32=False):
    file = Dataset(filename,'w')
    file.createDimension('n', ndim)
    file.createDimension('n2', ndim2)
    foo = file.createVariable('data2',\
            dtype,('n','n2'),zlib=zlib,least_significant_digit=least_significant_digit,\
            shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
    foo[:] = data
    file.close()
    file = Dataset(filename)
    data = file.variables['data2'][:]
    file.close()

class CompressionTestCase(unittest.TestCase):

    def setUp(self):
        self.files = files
        # no compression
        write_netcdf(self.files[0],False,None,array)
        # compressed, lossless, no shuffle.
        write_netcdf(self.files[1],True,None,array)
        # compressed, lossless, with shuffle.
        write_netcdf(self.files[2],True,None,array,shuffle=True)
        # compressed, lossy, no shuffle.
        write_netcdf(self.files[3],True,lsd,array)
        # compressed, lossy, with shuffle.
        write_netcdf(self.files[4],True,lsd,array,shuffle=True)
        # compressed, lossy, with shuffle and fletcher32 checksum.
        write_netcdf(self.files[5],True,lsd,array,shuffle=True,fletcher32=True)
        # 2-d compressed, lossy, with shuffle and fletcher32 checksum and
        # chunksizes.
        write_netcdf2(self.files[6],True,lsd,array2,shuffle=True,fletcher32=True,chunksizes=(chunk1,chunk2))

    def tearDown(self):
        # Remove the temporary files
        for file in self.files:
            os.remove(file)

    def runTest(self):
        """testing zlib and shuffle compression filters"""
        uncompressed_size = os.stat(self.files[0]).st_size
        # check uncompressed data
        f = Dataset(self.files[0])
        size = os.stat(self.files[0]).st_size
        assert_almost_equal(array,f.variables['data'][:])
        assert_almost_equal(array,f.variables['data2'][:])
        assert f.variables['data'].filters() ==\
        {'zlib':False,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':0,'fletcher32':False}
        assert f.variables['data2'].filters() ==\
        {'zlib':False,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':0,'fletcher32':False}
        assert_almost_equal(size,uncompressed_size)
        f.close()
        # check compressed data.
        f = Dataset(self.files[1])
        size = os.stat(self.files[1]).st_size
        assert_almost_equal(array,f.variables['data'][:])
        assert_almost_equal(array,f.variables['data2'][:])
        assert f.variables['data'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':6,'fletcher32':False}
        assert f.variables['data2'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':6,'fletcher32':False}
        assert(size < 0.95*uncompressed_size)
        f.close()
        # check compression with shuffle
        f = Dataset(self.files[2])
        size = os.stat(self.files[2]).st_size
        assert_almost_equal(array,f.variables['data'][:])
        assert_almost_equal(array,f.variables['data2'][:])
        assert f.variables['data'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':False}
        assert f.variables['data2'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':False}
        assert(size < 0.85*uncompressed_size)
        f.close()
        # check lossy compression without shuffle
        f = Dataset(self.files[3])
        size = os.stat(self.files[3]).st_size
        checkarray = _quantize(array,lsd)
        assert_almost_equal(checkarray,f.variables['data'][:])
        assert_almost_equal(checkarray,f.variables['data2'][:])
        assert(size < 0.27*uncompressed_size)
        f.close()
        # check lossy compression with shuffle
        f = Dataset(self.files[4])
        size = os.stat(self.files[4]).st_size
        assert_almost_equal(checkarray,f.variables['data'][:])
        assert_almost_equal(checkarray,f.variables['data2'][:])
        assert(size < 0.20*uncompressed_size)
        size_save = size
        f.close()
        # check lossy compression with shuffle and fletcher32 checksum.
        f = Dataset(self.files[5])
        size = os.stat(self.files[5]).st_size
        assert_almost_equal(checkarray,f.variables['data'][:])
        assert_almost_equal(checkarray,f.variables['data2'][:])
        assert f.variables['data'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':True}
        assert f.variables['data2'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':True}
        assert(size < 0.20*uncompressed_size)
        # should be slightly larger than without fletcher32
        assert(size > size_save)
        # check chunksizes
        f.close()
        f = Dataset(self.files[6])
        checkarray2 = _quantize(array2,lsd)
        assert_almost_equal(checkarray2,f.variables['data2'][:])
        assert f.variables['data2'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':True}
        assert f.variables['data2'].chunking() == [chunk1,chunk2]
        f.close()

if __name__ == '__main__':
    unittest.main()