Source code for soma.aims.demotools


'''
Utility module to list and install demo datasets available on the BrainVisa server.

The main functions are :func:`get_demo_datasets` and :func:`install_demo_data`.
'''

from soma import aims
from urllib.request import urlopen
import os
import os.path as osp
import zipfile
import tarfile
import re


def unzip_zip(filename, destdir):
    with open(filename, 'rb') as f:
        zf = zipfile.ZipFile(f, 'r')
        # extract zip files one by one
        # extractall() is not an option since on Mac at least it tries to
        # re-make directories even if they exist
        namelist = zf.namelist()
        fnlist = []
        for name in namelist:
            dname = os.path.join(destdir, name)
            if os.path.exists(dname):
                if os.path.isdir(dname):
                    pass  # skip existing dirs
                else:  # existing file: remove it first
                    os.unlink(dname)
                    fnlist.append(name)
            else:
                fnlist.append(name)
        del namelist
        zf.extractall(destdir, fnlist)

    fnlist = [f[:-1] if f.endswith('/') else f for f in fnlist]
    dirs = {osp.join(destdir, f)
            if osp.isdir(osp.join(destdir, f))
            else osp.dirname(osp.join(destdir, f))
            for f in fnlist}
    n = len(dirs)
    m = n + 1
    while n != m:
        dirs = {d for d in dirs if osp.dirname(d) not in dirs}
        m = n
        n = len(dirs)
    return dirs


def unzip_tar(filename, destdir):
    with tarfile.open(filename, 'r') as tf:
        members = tf.getnames()
        tf.extractall(destdir)

    members = [f[:-1] if f.endswith('/') else f for f in members]
    dirs = {osp.join(destdir, f)
            if osp.isdir(osp.join(destdir, f))
            else osp.dirname(osp.join(destdir, f))
            for f in members}
    n = len(dirs)
    m = n + 1
    while n != m:
        dirs = {d for d in dirs if osp.dirname(d) not in dirs}
        m = n
        n = len(dirs)
    return dirs


[docs] def unzip_file(filename, destdir=None): ''' unzip / untar the given archive in the directory it is ''' if destdir is None: destdir = osp.dirname(filename) if filename.endswith('.zip'): return unzip_zip(filename, destdir) else: return unzip_tar(filename, destdir)
def url_listdir(url): res = urlopen(url).read().decode() r2 = res[res.find('Parent Directory'):] rexp = re.compile('<a href="([^"]+)">', re.M) m = re.findall(rexp, r2) return m
[docs] def get_demo_datasets(download_url='https://brainvisa.info/download/data'): ''' List available demo datasets on the BrainVisa server ''' files = url_listdir(download_url) allowed_exts = {'.zip', '.tar.gz', '.tar.bz2'} filtered = [] for fname in files: if fname.endswith('/'): # dir continue if len([True for ext in allowed_exts if fname.endswith(ext)]) == 0: continue if 'descriptive_models' in fname: continue if '_atlas_' in fname: continue if 'Atlas' in fname: continue filtered.append(fname) return filtered
[docs] def install_demo_data(dataset='test_data.zip', download_dir=None, download_url='https://brainvisa.info/download/data', install_dir=None): ''' Downoad a demo dataset from brainvisa server to the ``download_dir`` directory, and install it in the local filesystem in ``install_dir``. If ``download_dir`` is not given, the output download location will be guessed as such: 1. try to write in ``$BRAINVISA_SHARE/brainvisa_demo/`` 2. if ``install_dir`` is provided, download in ``install_dir`` 3. try to wtite in the current directory 4. raise an error If ``install_dir`` is not given, download and install will be done in the same directory, namely ``download_dir``. If ``install_dir`` is given, the archive will be extracted to this directory. The return value is the dataset directories on the local filesystem. ''' full_url = download_url + '/' + dataset if download_dir is None: download_dirs = [osp.join(aims.carto.Paths.globalShared(), 'brainvisa_demo'), os.getcwd()] if install_dir is not None: download_dirs.insert(1, install_dir) for download_dir in download_dirs: if not osp.exists(download_dir): try: os.makedirs(download_dir) except Exception: pass try: with open(osp.join(download_dir, 'testfile'), 'w') as f: f.write('test writing\n') ok = True os.unlink(osp.join(download_dir, 'testfile')) except Exception: ok = False if ok: break # use this one if not ok: raise ValueError('Cound not find a suitable writable directory ' 'for download.') print('download dir:', download_dir) if install_dir is None: install_dir = download_dir elif not osp.exists(install_dir): os.makedirs(install_dir) print('install dir:', install_dir) zf = osp.join(download_dir, dataset) tzf = zf + '.part' ds = dataset.split('.') while ds[-1] in ('gz', 'zip', 'tar', 'bz2'): del ds[-1] with urlopen(full_url) as fr: fsize = int(fr.headers.get('content-length')) download = True if osp.exists(zf) and os.stat(zf).st_size == fsize: download = False print('already downloaded, skipping download.') if download: chunksize = 100000 fread = 0 with open(tzf, 'wb') as fw: while fread < fsize: pg = fread * 100 / fsize print(f'\r{full_url}: {int(pg)}%', end='') data = fr.read(chunksize) fw.write(data) fread += len(data) pg = fread * 100 / fsize print(f'\r{full_url}: {int(pg)}%') if osp.exists(zf): os.unlink(zf) os.rename(tzf, zf) print('download done') print('installing', dataset, '...') out_dirs = unzip_file(zf, destdir=install_dir) return out_dirs