Codebase list netcdf4-python / debian/1.4.2-1 test / tst_multifile.py
debian/1.4.2-1

Tree @debian/1.4.2-1 (Download .tar.gz)

tst_multifile.py @debian/1.4.2-1raw · history · blame

from netCDF4 import Dataset, MFDataset, MFTime, num2date, date2num, date2index
import numpy as np
from numpy.random import seed, randint
from numpy.testing import assert_array_equal, assert_equal
from numpy import ma
import tempfile, unittest, os, datetime
import cftime
from pkg_resources import parse_version

nx=100; ydim=5; zdim=10
nfiles = 10
ninc = nx/nfiles
files = [tempfile.NamedTemporaryFile(suffix='.nc', delete=False).name for nfile in range(nfiles)]
data = randint(0,10,size=(nx,ydim,zdim))
missval = 99
data[::10] = missval
data = ma.masked_values(data,missval)

class VariablesTestCase(unittest.TestCase):

    def setUp(self):
        self.files = files
        for nfile,file in enumerate(self.files):
            f = Dataset(file,'w',format='NETCDF4_CLASSIC')
            f.createDimension('x',None)
            f.createDimension('y',ydim)
            f.createDimension('z',zdim)
            f.history = 'created today'
            x = f.createVariable('x','i',('x',))
            x.units = 'zlotys'
            dat = f.createVariable('data','i',('x','y','z',))
            dat.long_name = 'phony data'
            dat.missing_value = missval
            nx1 = int(nfile*ninc); nx2 = int(ninc*(nfile+1))
            #x[0:ninc] = np.arange(nfile*ninc,ninc*(nfile+1))
            x[:] = np.arange(nfile*ninc,ninc*(nfile+1))
            #dat[0:ninc] = data[nx1:nx2]
            dat[:] = data[nx1:nx2]
            f.close()

    def tearDown(self):
        # Remove the temporary files
        for file in self.files:
            os.remove(file)

    def runTest(self):
        """testing multi-file dataset access"""
        f = MFDataset(self.files,check=True)
        f.set_auto_maskandscale(True) # issue570
        assert f.history == 'created today'
        assert_array_equal(np.arange(0,nx),f.variables['x'][:])
        varin = f.variables['data']
        datin = varin[:]
        assert_array_equal(datin.mask,data.mask)
        varin.set_auto_maskandscale(False)
        data2 = data.filled()
        assert varin.long_name == 'phony data'
        assert len(varin) == nx
        assert varin.shape == (nx,ydim,zdim)
        assert varin.dimensions == ('x','y','z')
        assert_array_equal(varin[4:-4:4,3:5,2:8],data2[4:-4:4,3:5,2:8])
        assert varin[0,0,0] == data2[0,0,0]
        assert_array_equal(varin[:],data2)
        assert getattr(varin,'nonexistantatt',None) == None
        f.close()
        # test master_file kwarg (issue #835).
        f = MFDataset(self.files,master_file=self.files[-1],check=True)
        assert_array_equal(np.arange(0,nx),f.variables['x'][:])
        varin = f.variables['data']
        assert_array_equal(varin[4:-4:4,3:5,2:8],data2[4:-4:4,3:5,2:8])
        f.close()
        # testing multi-file get_variables_by_attributes
        f = MFDataset(self.files,check=True)
        assert f.get_variables_by_attributes(axis='T') == []
        f.get_variables_by_attributes(units='zlotys')[0] == f['x']
        f.close()

class NonuniformTimeTestCase(unittest.TestCase):
    ninc = 365
    def setUp(self):

        self.files = [tempfile.NamedTemporaryFile(suffix='.nc', delete=False).name for nfile in range(2)]
        for nfile,file in enumerate(self.files):
            f = Dataset(file,'w',format='NETCDF4_CLASSIC')
            f.createDimension('time',None)
            f.createDimension('y',ydim)
            f.createDimension('z',zdim)
            f.history = 'created today'

            time = f.createVariable('time', 'f', ('time', ))
            #time.units = 'days since {0}-01-01'.format(1979+nfile)
            yr = 1979+nfile
            time.units = 'days since %s-01-01' % yr

            # Do not set the calendar attribute on the created files to test calendar
            # overload.
            # time.calendar = 'standard'

            x = f.createVariable('x','f',('time', 'y', 'z'))
            x.units = 'potatoes per square mile'

            nx1 = self.ninc*nfile;
            nx2 = self.ninc*(nfile+1)

            time[:] = np.arange(self.ninc)
            x[:] = np.arange(nx1, nx2).reshape(self.ninc,1,1) * np.ones((1, ydim, zdim))

            f.close()

    def tearDown(self):
        # Remove the temporary files
        for file in self.files:
            os.remove(file)


    def runTest(self):
        # The test files have no calendar attribute on the time variable.
        calendar = 'standard'

        # Get the real dates
        # skip this until cftime pull request #55 is in a released
        # version (1.0.1?). Otherwise, fix for issue #808 breaks this
        if parse_version(cftime.__version__) >= parse_version('1.0.1'):
            dates = []
            for file in self.files:
                f = Dataset(file)
                t = f.variables['time']
                dates.extend(num2date(t[:], t.units, calendar))
                f.close()

        # Compare with the MF dates
        f = MFDataset(self.files,check=True)
        t = f.variables['time']

        T = MFTime(t, calendar=calendar)
        assert_equal(T.calendar, calendar)
        assert_equal(len(T), len(t))
        assert_equal(T.shape, t.shape)
        assert_equal(T.dimensions, t.dimensions)
        assert_equal(T.typecode(), t.typecode())
        # skip this until cftime pull request #55 is in a released
        # version (1.0.1?). Otherwise, fix for issue #808 breaks this
        if parse_version(cftime.__version__) >= parse_version('1.0.1'):
            assert_array_equal(num2date(T[:], T.units, T.calendar), dates)
        assert_equal(date2index(datetime.datetime(1980, 1, 2), T), 366)
        f.close()

        # Test exception is raised when no calendar attribute is available on the
        # time variable.
        with MFDataset(self.files, check=True) as ds:
            with self.assertRaises(ValueError):
                MFTime(ds.variables['time'])

        # Test exception is raised when the calendar attribute is different on the
        # variables. First, add calendar attributes to file. Note this will modify
        # the files inplace.
        calendars = ['standard', 'gregorian']
        for idx, f in enumerate(self.files):
            with Dataset(f, 'a') as ds:
                ds.variables['time'].calendar = calendars[idx]
        with MFDataset(self.files, check=True) as ds:
            with self.assertRaises(ValueError):
                MFTime(ds.variables['time'])


if __name__ == '__main__':
    unittest.main()