Source code for OceanColor.storage
"""Store and manage NASA data
Different backends allow for different ways to handle the data from NASA.
"""
from datetime import datetime, timedelta
import logging
import numpy as np
import random
import tempfile
import time
import threading
import xarray as xr
from .gsfc import read_remote_file
# To guarantee backward compatibility
from .backend import *
module_logger = logging.getLogger("OceanColor.storage")
[docs]class OceanColorDB(object):
"""An abstraction of NASA's Ocean Color database
While OceanColorDB provides access to NASA's ocean color data, it is the
backend that manages the data accessed. Currently, there is only one
backend based on local files and directories. But it is planned more
alternatives such as AWS S3 storage.
Examples
--------
>>> db = OceanColorDB(username, password)
>>> db.backend = FileSystem('./')
>>> ds = db['T2004006.L3m_DAY_CHL_chlor_a_4km.nc']
>>> ds.attrs
Notes
-----
Think about the best way to define the backend. Maybe add an optional
parameter path, which if available is used to define the backend as a
FileSystem.
"""
logger = logging.getLogger("OceanColor.storage.OceanColorDB")
backend = BaseStorage()
lock = threading.Lock()
time_last_download = datetime(1970, 1, 1)
[docs] def __init__(self, username: str, password: str, download: bool = True):
"""Initializes OceanColorDB
Parameters
----------
username: str
The username registered with EarthData
password: str
The password associated the the username
download: bool, optional
Download new data when required, otherwise limits to the already
available datasets. Default is true, i.e. download when necessary.
"""
self.logger.debug("Instantiating OceanColorDB")
self.username = username
self.password = password
self.download = download
def __contains__(self, item: str):
return item in self.backend
def __getitem__(self, key):
"""
Maybe use BytesIO?? or ds.compute()?
"""
self.logger.debug(f"Reading from backend: {key}")
try:
return self.backend[key]
except KeyError:
self.logger.debug(f"{key} is not on the storage")
if not self.download:
self.logger.info(
f"{key} is not available and download is off."
)
raise KeyError
return self._download(key)
def _download(self, index):
module_logger.debug("Downloading from Ocean Color: {}".format(index))
# Probably move this reading from remote to another function
content = self._remote_content(index)
# ds = xr.open_dataset(BytesIO(content))
# Seems like it can't read groups using BytesIO
with tempfile.NamedTemporaryFile(mode="w+b", delete=True) as tmp:
self.logger.debug("Saving to temporary file: {tmp.name}")
tmp.write(content)
tmp.flush()
ds = xr.open_dataset(tmp.name)
assert ds.processing_level in (
"L2",
"L3 Mapped",
), "I only handle L2 or L3 Mapped"
if ds.processing_level == "L2":
geo = xr.open_dataset(tmp.name, group="geophysical_data")
ds = ds.merge(geo)
nav = xr.open_dataset(tmp.name, group="navigation_data")
ds = ds.merge(nav)
# Maybe include full scan line into ds
sline = xr.open_dataset(tmp.name, group="scan_line_attributes")
ds["time"] = (
(sline - 1970).year.astype("datetime64[Y]")
+ sline.day
- np.timedelta64(1, "D")
+ sline.msec
)
ds = ds.rename({"latitude": "lat", "longitude": "lon"})
self.backend[index] = ds
return ds
def _remote_content(
self, filename: str, t_min: int = 4, t_random: int = 4
):
"""Read a remote file with a minimum time between downloads
NASA monitors the downloads and excessive activity is temporarily
banned, so this function guarantees a minimum time between downloads
to avoid ovoerloading NASA servers.
"""
self.logger.debug("Acquiring lock for remote content")
self.lock.acquire()
self.logger.debug("Lock acquired")
dt = t_min + round(random.random() * t_random, 2)
next_time = self.time_last_download + timedelta(seconds=(dt))
waiting_time = max((next_time - datetime.now()).total_seconds(), 0)
self.logger.debug(
f"Waiting {waiting_time} seconds before downloading."
)
time.sleep(waiting_time)
try:
self.logger.info(f"Downloading: {filename}")
content = read_remote_file(filename, self.username, self.password)
finally:
self.time_last_download = datetime.now()
self.logger.debug("remote_content releasing lock")
self.lock.release()
return content
def check(self, index):
"""Confirm that index is availble, otherwise, download it
Useful in a pre-processing stage to guarantee that all required data
is available. For instance a cronjob could run periodically just
downloading new data so that it is available when the analysis is
actually running.
"""
if index in self:
self.logger.debug(f"Item already available: {index}")
else:
ds = self._download(index)
ds.close()