Source code for eedl.helpers

import os
import itertools
import datetime
from typing import Optional

from .core import safe_fiona_open
from .image import EEDLImage, TaskRegistry

import ee
from ee import ImageCollection


[docs] class CollectionExtractor(): """ This is a simple layer on top of EEDLImage that will export each item in a collection. Importantly, it will attempt to minimize any regridding of the raster by not doing any kind of strict boundary filtering. You can provide a collection filtered to a geometry and then it will export all of the images inside without any kind of clipping. If you choose the same CRS for output as the input, this should avoid regridding the raster. """ collection: Optional[ee.ImageCollection] = None collection_band: Optional[str] = None time_start: Optional[str] = None time_end: Optional[str] = None mosaic_by_date: Optional[bool] = True def __init__(self, **kwargs): for kwarg in kwargs: setattr(self, kwarg, kwargs[kwarg])
[docs] class GroupedCollectionExtractor(): """ The GroupedCollectionExtractor is currently the most powerful tool in the package, though it has some limits based on its assumptions. Using this class for an export allows you to provide spatial data (as polygons) indicating regions of interest (ROIs), as well as a separate set of spatial data indicating the polygons to extract data for within each ROI. With that information, the class will export every image from a collection within the date range you provide, clipped to each ROI, then obtain zonal statistics for the polygons within each ROI. You'll need to pay special attention to the parameters to the initialization function. Attributes: on_error (str, "log"): What to do when an error occurs. Options are "log", which writes it to string and disk, and "raise" which raises the error as an exception and stops execution of the class. skip_existing (bool, True): keep_image_objects (bool, False): Whether to store the EEDLImage objects as part of this class, so they can be accessed when it's done. We don't just to not use the RAM on large exports. This does *not* specify anything about whether the image data is written to disk - that happens automatically and by default """ def __init__(self, **kwargs): self.keep_image_objects = False # Whether to store the EEDLImage objects on this class, so they can be accessed when it's done. We don't just to not use the RAM on large exports. self.all_images = [] # All the exported images are saved here. They can then be operated on once the extractor is complete. self.skip_existing = True # A feature allowing it to resume from crashes. If the mosaic image exists, it skips doing any processing on the rest of it. self.on_error = "log" self.filename_description = "" self.collection = None self.collection_band = None self.time_start = None self.time_end = None self.mosaic_by_date = True self.areas_of_interest_path = None # The path to a spatial data file readable by Fiona/GEOS that has features defining AOIs to extract individually. self.strict_clip = True # May be necessary for some things to behave, so keeping this as a default to True. People can disable if they know what they're doing (maybe faster). self.export_type = "drive" self.drive_root_folder = None self.cloud_bucket = None self.download_folder = None # Local folder name after downloading for processing. self.export_folder = None # Drive/cloud export folder name. self.zonal_run = True self.zonal_areas_of_interest_attr = None # What is the attribute on each of the AOI polygons that tells us what items to use in the zonal extraction. self.zonal_features_path = None # What polygons to use as inputs for zonal stats. self.zonal_features_area_of_interest_attr = None # What field in the zonal features has the value that should match zonal_areas_of_interest_attr? self.zonal_features_preserve_fields = None # What fields to preserve, as a tuple - typically an ID and anything else you want. self.zonal_stats_to_calc = () # What statistics to output by zonal feature. self.zonal_use_points = False self.zonal_inject_date: bool = False self.zonal_inject_group_id: bool = False self.zonal_nodata_value: int = 0 self.merge_sqlite = True # Should we merge all outputs to a single SQLite database. self.merge_grouped_csv = True # Should we merge CSV by grouped item. self.merge_final_csv = False # Should we merge all output tables. self._all_outputs = list() # For storing the paths to all output csv files. self.max_fiona_features_load = 1000 # Threshold where we switch from keeping fiona features in memory as a list to using itertools.tee to split the iterator. for kwarg in kwargs: setattr(self, kwarg, kwargs[kwarg]) def _single_item_extract(self, image, task_registry, zonal_features, aoi_attr, ee_geom, image_date, aoi_download_folder): """ This looks a bit silly here, but we need to construct this here so that we have access to this method's variables since we can't pass them in and it can't be a class function. Args: image: task_registry: zonal_features: aoi_attr: ee_geom: image_date: aoi_download_folder: Returns: None """ export_image = EEDLImage( task_registry=task_registry, drive_root_folder=self.drive_root_folder, cloud_bucket=self.cloud_bucket, filename_description=self.filename_description ) export_image.zonal_polygons = zonal_features export_image.zonal_use_points = self.zonal_use_points export_image.zonal_keep_fields = self.zonal_features_preserve_fields export_image.zonal_stats_to_calc = self.zonal_stats_to_calc export_image.zonal_nodata_value = self.zonal_nodata_value export_image.date_string = image_date zonal_inject_constants = {} if self.zonal_inject_date: zonal_inject_constants["date"] = image_date if self.zonal_inject_group_id: zonal_inject_constants["group_id"] = aoi_attr export_image.zonal_inject_constants = zonal_inject_constants filename_suffix = f"{aoi_attr}_{image_date}" if self.skip_existing and export_image.check_mosaic_exists(aoi_download_folder, self.export_folder, f"{self.filename_description}_{filename_suffix}"): print(f"Image {filename_suffix} exists and skip_existing=True. Skipping") return export_image.export(image, export_type=self.export_type, filename_suffix=filename_suffix, clip=ee_geom, strict_clip=self.strict_clip, folder=self.export_folder, # The folder to export to in Google Drive ) # This all needs some work still so that.
[docs] def extract(self): collection = self._get_and_filter_collection() # Now we need to get each polygon to filter the bounds to and make a new collection with filterBounds for just # that geometry self._all_outputs = list() features = safe_fiona_open(self.areas_of_interest_path) try: num_complete = 0 for feature in features: print(f"Number of complete AOIs: {num_complete}") task_registry = TaskRegistry() ee_geom = ee.Geometry.Polygon(feature['geometry']['coordinates'][0]) # WARNING: THIS DOESN'T CHECK CRS aoi_collection = collection.filterBounds(ee_geom) # Get some variables defined for use in extracting the zonal stats. aoi_attr = feature.properties[self.zonal_areas_of_interest_attr] # This is the value we'll search for in the zonal features. zonal_features_query = f"{self.zonal_features_area_of_interest_attr} = '{aoi_attr}'" aoi_download_folder = os.path.join(self.download_folder, aoi_attr) fiona_zonal_features = safe_fiona_open(self.zonal_features_path) try: zonal_features_filtered = fiona_zonal_features.filter(where=zonal_features_query) image_list = aoi_collection.toList(aoi_collection.size()).getInfo() indicies_and_dates = [(im['properties']['system:index'], im['properties']['system:time_start']) for im in image_list] """ if len(zonal_features_filtered) < self.max_fiona_features_load: # zonal_features_filtered = list(zonal_features_filtered) # this *would* be inefficient, but we're going to re-use it so many times, it's not terrible, exce # using_tee = False # else: # using an itertools tee may not be more efficient than a list, but it also might, because # even if we iterate through all features and all features remain queued for other iterations # it may not load all attributes, etc, for each feature if fiona lazy loads anything. It won't # be that much slower in any case, though the complexity of maintaining the code here is something # to consider """ zonal_features_filtered_tee = itertools.tee(zonal_features_filtered, len(image_list)) using_tee = True for i, image_info in enumerate(indicies_and_dates): if using_tee: zonal_features = zonal_features_filtered_tee[i - 1] else: zonal_features = zonal_features_filtered image = aoi_collection.filter(ee.Filter.eq("system:time_start", image_info[1])).first() # Get the image from the collection again based on ID. timestamp_in_seconds = int(str(image_info[1])[:-3]) # We could divide by 1000, but then we'd coerce back from a float. This is precise. date_string = datetime.datetime.fromtimestamp(timestamp_in_seconds, tz=datetime.timezone.utc).strftime("%Y-%m-%d") self._single_item_extract(image, task_registry, zonal_features, aoi_attr, ee_geom, date_string, aoi_download_folder) # Ok, now that we have a collection for the AOI, we need to iterate through all the images # in the collection as we normally would in a script, but also extract the features of interest for use # in zonal stats. Right now the zonal stats code only accepts files. We might want to make it accept # some kind of fiona iterator - can we filter fiona objects by attributes? # Fiona supports SQL queries on open and zonal stats now supports receiving an open fiona object. task_registry.setup_log(os.path.join(self.download_folder, "eedl_processing_error_log.txt")) task_registry.wait_for_images(aoi_download_folder, sleep_time=15, callback="mosaic_and_zonal", try_again_disk_full=False, on_failure=self.on_error) if self.keep_image_objects: self.all_images.extend(task_registry.images) finally: fiona_zonal_features.close() num_complete += 1 finally: features.close()
def _get_and_filter_collection(self): collection = ImageCollection(self.collection) if self.time_start or self.time_end: collection = collection.filterDate(self.time_start, self.time_end) if self.collection_band: collection = collection.select(self.collection_band) if self.mosaic_by_date: # We're supposed to take the images in the collection and merge them so that all images on one date are a single image. collection = mosaic_by_date(collection) return collection
[docs] def mosaic_by_date(image_collection): """ Adapted to Python from code found via https://gis.stackexchange.com/a/343453/1955 :param image_collection: An image collection :return: ee.ImageCollection """ image_list = image_collection.toList(image_collection.size()) unique_dates = image_list.map(lambda im: ee.Image(im).date().format("YYYY-MM-dd")).distinct() def _make_mosaicked_image(d): d = ee.Date(d) image = image_collection.filterDate(d, d.advance(1, "day")).mosaic() image_w_props = image.set( "system:time_start", d.millis(), "system:id", d.format("YYYY-MM-dd"), "system:index", d.format("YYYY-MM-dd") ).rename(d.format("YYYY-MM-dd")), return image_w_props[0] mosaic_imlist = unique_dates.map(_make_mosaicked_image) return ee.ImageCollection(mosaic_imlist)