Source code for eedl.image

import os
import io
import shutil
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from typing_extensions import TypedDict, NotRequired, Unpack
import traceback
import datetime

import ee
from ee import EEException

from . import google_cloud
from . import mosaic_rasters
from . import zonal


[docs] class EEExportDict(TypedDict): fileDimensions: Optional[int] folder: NotRequired[Optional[Union[str, Path]]] crs: Optional[str] region: NotRequired[ee.geometry.Geometry] description: str fileNamePrefix: str scale: Union[int, float] maxPixels: Union[int, float] bucket: NotRequired[Optional[str]]
DEFAULTS = dict( CRS='EPSG:4326', TILE_SIZE=12800, # multiple of shardSize default 256 EXPORT_FOLDER="ee_exports", SCALE=30 )
[docs] def download_images_in_folder(source_location: Union[str, Path], download_location: Union[str, Path], prefix: str) -> None: """ Handles pulling data from Google Drive over to a local location, filtering by a filename prefix and folder Args: source_location (Union[str, Path]): Directory to search for files. download_location (Union[str, Path]): Destination for files with the specified prefix. prefix (str): A prefix to use to filter items in the folder - only files where the name matches this prefix will be moved. Returns: None """ folder_search_path: Union[str, Path] = source_location files = [filename for filename in os.listdir(folder_search_path) if filename.startswith(prefix)] if len(files) == 0: print(f"Likely Error: Could not find files to download for {prefix} in {folder_search_path} - you likely have a misconfiguration in your export parameters. Future steps may fail.") os.makedirs(download_location, exist_ok=True) for filename in files: shutil.move(str(os.path.join(folder_search_path, filename)), str(os.path.join(download_location, filename)))
[docs] class TaskRegistry: """ The TaskRegistry class makes it convenient to manage arbitrarily many Earth Engine images that are in varying states of being downloaded. """ INCOMPLETE_STATUSES = ("READY", "UNSUBMITTED", "RUNNING") COMPLETE_STATUSES = ["COMPLETED"] FAILED_STATUSES = ["CANCEL_REQUESTED", "CANCELLED", "FAILED"] def __init__(self) -> None: """ Initialized the TaskRegistry class and defaults images to "[]" and the callback function to "None" Returns: None """ self.images: List[EEDLImage] = [] self.callback: Optional[str] = None self.log_file_path: Optional[Union[str, Path]] = None # the path to the log file self.log_file: Optional[io.TextIOWrapper] = None # the open log file handle self.raise_errors: bool = True
[docs] def add(self, image: "EEDLImage") -> None: """ Adds an Earth Engine image to the list of Earth Engine images. Args: image (ee.image.Image): Earth Engine image to be added to the list of images Returns: None """ self.images.append(image)
@property def incomplete_tasks(self) -> List["EEDLImage"]: """ List of Earth Engine images that have not been completed yet. Returns: List[ee.image.Image]: List of Earth Engine images that have not been completed yet. """ initial_tasks = [image for image in self.images if image.last_task_status['state'] in self.INCOMPLETE_STATUSES] for image in initial_tasks: # update anything that's currently running or waiting first image._check_task_status() return [image for image in self.images if image.last_task_status['state'] in self.INCOMPLETE_STATUSES] @property def complete_tasks(self) -> List["EEDLImage"]: """ List of Earth Engine images. Returns: List[ee.image.Image]: List of Earth Engine images. """ return [image for image in self.images if image.last_task_status['state'] in self.COMPLETE_STATUSES + self.FAILED_STATUSES] @property def failed_tasks(self) -> List["EEDLImage"]: """ List of Earth Engine images that have either been cancelled or that have failed Returns: List[ee.image.Image]: List of Earth Engine images that have failed or have been cancelled. """ return [image for image in self.images if image.last_task_status['state'] in self.FAILED_STATUSES] @property def downloadable_tasks(self) -> List["EEDLImage"]: """ List of Earth Engine images that have not been cancelled or have failed. Returns: List[ee.image.Image]: List of Earth Engine images that have not been cancelled or have failed. """ return [image for image in self.complete_tasks if image.task_data_downloaded is False and image.last_task_status['state'] not in self.FAILED_STATUSES]
[docs] def download_ready_images(self, download_location: Union[str, Path]) -> None: """ Downloads all images that are ready to be downloaded. Args: download_location (Union[str, Path]): Destination for downloaded files. Returns: None """ for image in self.downloadable_tasks: try: print(f"{image.filename} is ready for download") image.download_results(download_location=download_location, callback=self.callback) except: # noqa: E722 # on any error raise or log it if self.raise_errors: raise error_details = traceback.format_exc() self.log_error("local", f"Failed to process image {image.filename}. Error details: {error_details}")
[docs] def setup_log(self, log_file_path: Union[str, Path], mode='a'): self.log_file_path = log_file_path self.log_file = open(self.log_file_path, 'a')
[docs] def log_error(self, error_type: str, error_message: str): """ Args: error_type (str): Options "ee", "local" to indicate whether it was an error on Earth Engine's side or on the local processing side error_message (str): The error message to print to the log file Returns: None """ message = f"{error_type} Error: {error_message}" date_string = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") print(message) if self.log_file: self.log_file.write(f"{date_string}: {message}")
def __del__(self): if self.log_file is not None: try: self.log_file.close() except: # noqa: E722 # If we get any exception while closing it, don't make noise, just move on. We're just trying to be clean here where we can pass
[docs] def wait_for_images(self, download_location: Union[str, Path], sleep_time: int = 10, callback: Optional[str] = None, try_again_disk_full: bool = True, on_failure: str = "log") -> None: """ Tells EEDL to wait until there are no more incomplete or downloadable tasks left. It will block execution of any following code until all images have been downloaded and processed. Any code that runs afterward can rely on the images being downloaded to disk. Args: download_location (Union[str, Path]): Destination for downloaded files. sleep_time (int): Time between checking if downloads are available (complete) in seconds. Defaults to 10 seconds. This is also the interval where task statuses are updated on image objects. callback (Optional[str]): Optional callback function. Executed after image has been downloaded and allows for processing of completed images even while waiting for other images to complete on Earth Engine's servers. try_again_disk_full (bool): Will continuously retry to download images that are ready, even if it fails to do so initially because the disk is full. This allows you to get the warning that the disk is full, then clear out space to allow processing to complete, without restarting the exports or processing. on_failure (str): ***Needs language*** Returns: None """ if on_failure == "raise": self.raise_errors = True elif on_failure == "log" and self.log_file: # if they say to log the errors and specified a log file, set raise errors to False self.raise_errors = False self.callback = callback while len(self.incomplete_tasks) > 0 or len(self.downloadable_tasks) > 0: try: self.download_ready_images(download_location) except OSError: if try_again_disk_full: print("OSError reported. Invalid disk or the disk may be full - will try again - clear space") pass else: raise time.sleep(sleep_time) if len(self.failed_tasks) > 0: message = f"{len(self.failed_tasks)} image(s) failed to export. Example error message from first" \ f" failed image \"{self.failed_tasks[0].last_task_status['description']}\" was" \ f" \"{self.failed_tasks[0].last_task_status['error_message']}\"." \ f" Check https://code.earthengine.google.com/tasks in your web browser to see status and" \ f" messages for all export tasks." if on_failure == "raise": raise EEException(message) else: print(message)
main_task_registry = TaskRegistry()
[docs] class EEDLImage: """ The main class that does all the work. Any use of this package ultimately instantiates this class for each export the user wants to do. As we refine this, we may be able to provide just a single function in this module named "export" or something of that sort for people who don't need access to control class behavior. That will likely follow all the other enhancements, like converting the exports into async code. The class has no required arguments as of 6/16/2023, but that may change. Any arguments provided get applied directly to the class and override any defaults. It may be good to set :code:`crs` and :code:`scale` here to make sure they are set correctly for the images you plan to export, but those can also be provided as kwargs to the :code:`.export` method. Note that in the arguments below, the ones prefixed by zonal are only used if you configure the :code:`mosaic_and_zonal` callback on the TaskRegistry. Otherwise, when calling the :code:`zonal_stats` method, you need to provide the parameters there. Options at class instantiation include: Args: crs (Optional[str]): Coordinate Reference System to use for exports in a format Earth Engine understands, scale (Optional[int]): Scale parameter to pass to Earth Engine for export. Defaults to 30 tile_size (Optional[int]): The number of pixels per side of tiles to export export_folder (Optional[Union[str, Path]]): The name of the folder in the chosen export location that will be created for the export cloud_bucket (Optional[str]): The name of the Google Cloud storage bucket to use for exports - setting this parameter doesn't automatically configure output to the bucket. When running :code:`.export` your also need to specify a cloud export (instead of a `drive` export) output_folder (Optional[Union[str, Path]]): The folder, local to your system running the code, to export the finished images and optional zonal statistics files to. zonal_polygons: Optional[Union[str, Path]]: The path to a fiona-compatible polygon vector data file (e.g. a shapefile, geopackage layer, or other). Only used with the :code:`mosaic_and_zonal` callback. See note above. zonal_stats_to_calc: Optional[Tuple]: Tuple of zonal statistics to calculate. For example :code:`('min', 'max', 'mean')`. See the `documentation for the rasterstats package <https://pythonhosted.org/rasterstats/>`_ for full options. Only used with the :code:`mosaic_and_zonal` callback. See note above. zonal_keep_fields: Optional[Tuple]: Which fields should be preserved (passed through) from the spatial input data to the output zonal data. You will want to at least define the row's ID/key value here (in a tuple, such as :code:`('ID',)` so you can join the zonal stats back to spatial data, but you can optionally include any other fields as well. Only used with the :code:`mosaic_and_zonal` callback. See note above. zonal_use_points: bool: Switch rasterstats to extract using gen_point_query instead of gen_zonal_stats. See rasterstats package documentation for complete information. Get_point_query will get the values of a raster at all vertex locations when provided with a polygon or line. If provided points, it will extract those point values. We set interpolation to the nearest to perform an exact extraction of the cell values. In this codebase's usage, it's assumed that the "features" parameter to this function will be a points dataset (still in the same CRS as the raster) when use_points is True. Additionally, when this is True, the `stats` argument to this function is ignored as only a single value will be extracted as the attribute `value` in the output CSV. Default is False. Only used with the :code:`mosaic_and_zonal` callback. See note above. zonal_output_filepath: Optional[Union[str, Path]]: Only used with the :code:`mosaic_and_zonal` callback. See note above. zonal_inject_constants: dict: Only used with the :code:`mosaic_and_zonal` callback. See note above. zonal_nodata_value: int: Only used with the :code:`mosaic_and_zonal` callback. See note above. zonal_all_touched: bool: Only used with the :code:`mosaic_and_zonal` callback. See note above. """ def __init__(self, **kwargs) -> None: """ Initializes many class variables and sets provided kwargs. Returns: None """ self.drive_root_folder: Optional[Union[str, Path]] = None self.crs: Optional[str] = None self.tile_size: Optional[int] = None self.export_folder: Optional[Union[str, Path]] = None self.mosaic_image: Optional[Union[str, Path]] = None self.task: Optional[ee.batch.Task] = None self.cloud_bucket: Optional[str] = None self._ee_image: Optional[ee.image.Image] = None self.output_folder: Optional[Union[str, Path]] = None self.task_registry = main_task_registry self.scale: Union[int, float] = 1 self.filename_description = "" self.date_string = "" # For items that want to store a date representation. # These values are only used if someone calls the mosaic_and_zonal callback - we need the values defined on. # The class to do that. self.zonal_polygons: Optional[Union[str, Path]] = None self.zonal_stats_to_calc: Optional[Tuple] = None self.zonal_keep_fields: Optional[Tuple] = None self.zonal_use_points: bool = False self.zonal_output_filepath: Optional[Union[str, Path]] = None # set by self.zonal_stats self.zonal_inject_constants: dict = dict() self.zonal_nodata_value: int = -9999 self.zonal_all_touched: bool = False # Set the defaults here - this is a nice strategy where we get to define constants near the top that aren't buried in code, then apply them here. for key in DEFAULTS: setattr(self, key.lower(), DEFAULTS[key]) for key in kwargs: # Now apply any provided keyword arguments over the top of the defaults. setattr(self, key, kwargs[key]) self._last_task_status = {"state": "UNSUBMITTED"} # This will be the default status initially, so always assume it's UNSUBMITTED if we haven't gotten anything. # From the server. "None" would work too, but then we couldn't just check the status. self.task_data_downloaded = False self.export_type = "Drive" # The other option is "Cloud". def _set_names(self, filename_suffix: str = "") -> None: """ Args: filename_suffix (str): Suffix used to later identify files. Returns: None """ self.description = filename_suffix self.filename = f"{self.filename_description}_{filename_suffix}" @staticmethod def _initialize() -> None: """ Handles the initialization and potentially the authentication of Earth Engine. Returns: None """ try: # Try just a basic discard-able operation used in their docs so that we don't initialize if we don't need to. _ = ee.Image("NASA/NASADEM_HGT/001") except EEException: # If it fails, try just running initialize. try: ee.Initialize() except EEException: # If that still fails, try authenticating first. ee.Authenticate() ee.Initialize() @property def last_task_status(self) -> Dict[str, str]: """ Returns the status of the image's task on Earth Engine's servers, as last reported when it was checked. Calling this method does not re-check the task status. Instead, those checks are scheduled via the TaskRegistry and stored for access here. The task status reported by Earth Engine comes back as a dictionary, which is what is stored here. We pay the most attention to the "STATE" key, which indicates where processing of the export is in the lifecycle. See https://developers.google.com/earth-engine/guides/processing_environments#task_lifecycle for more information on task lifecycles and possible values. The following document from Earth Engine further has an example with the full set of keys and example values for a task status dictionary: https://developers.google.com/earth-engine/guides/python_install#create-an-export-task: Returns: Dict[str, str]: Return the private variable "_last_task_status" """ return self._last_task_status @last_task_status.setter def last_task_status(self, new_status: Dict[str, str]) -> None: """ Sets the value of the private variable "_last_task_status" to a specified value. Realistically, this shouldn't be used as the value should only be set from within the object, but it's here in case it's needed. Args: new_status (Dict[str, str]): Status to update the _last_task_status to. Returns: None """ self._last_task_status = new_status
[docs] def export(self, image: ee.image.Image, filename_suffix: str, export_type: str = "drive", clip: Optional[ee.geometry.Geometry] = None, strict_clip: Optional[bool] = False, drive_root_folder: Optional[Union[str, Path]] = None, **export_kwargs: Unpack[EEExportDict]) -> None: """ Handles the exporting of an image. Determines the values to provide to tile and export the image, then calls Earth Engine's image export functionality, creates an export task, and starts the task running. Importantly, this code *does not* wait for the task to finish exporting, or do any downloading of the image. It just starts the export process on EE's servers in a way that can be tracked later. This is by design because if you want to export many images, what is most efficient is to start all of the image tasks, and then wait for all of them at once. The :code:`drive_root_folder` parameter must be set if :code:`export_type` is :code:`drive`, but is optional when :code:`export_type` is :code:`cloud`. When :code:`export_type` is :code:`cloud` you must provide the name of the Google Cloud storage bucket to export to in the :code:`bucket` parameter. Both export types have configuration requirements and limitations discussed in :ref:`ExportLocations`. Note that this method returns None - if you wish to save an object for tracking and to obtain the path to the mosaicked image after everything is downloaded, keep the whole class instance object (or save it into a list, etc). Args: image (ee.image.Image): Image for export. filename_suffix (str): The unique identifier used internally to identify images. export_type (str): Specifies how the image should be exported. Either "cloud" or "drive". Defaults to "drive". clip (Optional[ee.geometry.Geometry]): Defines the region of interest for export - does not perform a strict clip, which is often slower. Instead, it uses the Earth Engine export's "region" parameter to clip the results to the bounding box of the clip geometry. To clip to the actual geometry, set strict_clip to True. strict_clip (Optional[bool]): When set to True, performs a true clip on the result so that it's not just the bounding box but also the actual clipping geometry. Defaults to False. drive_root_folder (Optional[Union[str, Path]]): The folder on your computer that has the root of your Google Drive installation (e.g. :code:`G:\\My Drive` on Windows) if "drive" is the provided export type. bucket (Optional[str]): The Google Cloud bucket to place the exported images into if using the `cloud` export_type. export_kwargs (Unpack[EExportDict]): An optional dictionary of keyword arguments that gets passed directly to Earth Engine's image export method. Overrides any values EEDL manually calculates, if a key is set here that is also set elsewhere (such as on the class or derived in the method). Returns: None """ if not isinstance(image, ee.image.Image): raise ValueError("Invalid image provided for export - please provide a single image (not a collection or another object) of class ee.image.Image for export") if export_type.lower() == "drive" and \ (self.drive_root_folder is None or not os.path.exists(self.drive_root_folder)) and \ (drive_root_folder is None or not os.path.exists(drive_root_folder)): raise NotADirectoryError("The provided path for the Google Drive export folder is not a valid directory but" " Drive export was specified. Either change the export type to use Google Cloud" " and set that up properly (with a bucket, etc), or set the drive_root_folder" " to a valid folder.") elif export_type.lower() == "drive": if drive_root_folder: self.drive_root_folder = drive_root_folder self._initialize() if clip and not isinstance(clip, ee.geometry.Geometry): raise ValueError("Invalid geometry provided for clipping. Export parameter `clip` must be an instance of ee.geometry.Geometry") if clip and strict_clip and isinstance(clip, ee.geometry.Geometry): # image = image.clip(clip) self._ee_image = image self._set_names(filename_suffix) ee_kwargs: EEExportDict = { 'description': self.description, 'fileNamePrefix': self.filename, 'scale': self.scale, 'maxPixels': 1e12, 'fileDimensions': self.tile_size, 'crs': self.crs } if isinstance(clip, ee.geometry.Geometry): ee_kwargs["region"] = clip # Override any of these defaults with anything else provided. ee_kwargs.update(export_kwargs) if "folder" not in ee_kwargs: # If they didn't specify a folder, use the class' default or whatever they defined previously. ee_kwargs['folder'] = self.export_folder else: self.export_folder = ee_kwargs['folder'] # We need to persist this, so we can find the image later on, and so it's picked up by cloud export code below. if export_type.lower() == "drive": self.task = ee.batch.Export.image.toDrive(self._ee_image, **ee_kwargs) elif export_type.lower() == "cloud": # Add the folder to the filename here for Google Cloud. ee_kwargs['fileNamePrefix'] = f"{self.export_folder}/{ee_kwargs['fileNamePrefix']}" if "bucket" not in ee_kwargs: # If we already defined the bucket on the class, use that. ee_kwargs['bucket'] = self.cloud_bucket else: # Otherwise, attempt to retrieve it from the call to this function. self.cloud_bucket = str(ee_kwargs['bucket']) if "folder" in ee_kwargs: # We made this part of the filename prefix above, so delete it now, or it will cause an error. del ee_kwargs["folder"] self.task = ee.batch.Export.image.toCloudStorage(self._ee_image, **ee_kwargs) # Export_type is not valid else: raise ValueError("Invalid value for export_type. Did you mean \"drive\" or \"cloud\"?") self.task.start() self.export_type = export_type self.task_registry.add(self)
[docs] @staticmethod def check_mosaic_exists(download_location: Union[str, Path], export_folder: Union[str, Path], filename: str): """ This function isn't ideal because it duplicates information - you need to pass it in elsewhere and assume this file format matches, rather than actually calculating the paths earlier in the process. But that's currently necessary because the task registry sets the download location right now. So we want to be able to check at any time if the mosaic exists so that we can skip processing - we're using this. Otherwise, we'd need to do a big refactor that's probably not worth it. """ output_file = os.path.join(str(download_location), str(export_folder), f"{filename}_mosaic.tif") return os.path.exists(output_file)
[docs] def download_results(self, download_location: Union[str, Path], callback: Optional[str] = None, drive_wait: int = 15) -> None: """ Handles the download and optional postprocessing of the current image to the folder specified :code:`download_location`. Users of EEDL won't need to invoke this except in very advanced situations. The Task Registry automatically invokes this method when it detects that the image has completed exporting. Args: download_location (Union[str, Path]): The directory where the results should be downloaded to. Expects a string path or a Pathlib Path object. callback (Optional[str]): The callback function is called once the image has been downloaded. drive_wait (int): The amount of time in seconds to wait to allow for files that Earth Engine reports have been exported to actually populate. Default is 15 seconds. Returns: None """ # Need an event loop that checks self.task.status(), which will get the current state of the task. # state options # == "CANCELLED", "CANCEL_REQUESTED", "COMPLETED", # "FAILED", "READY", "SUBMITTED" (maybe - double check that - it might be that it waits with UNSUBMITTED), # "RUNNING", "UNSUBMITTED" self.output_folder = os.path.join(str(download_location), str(self.export_folder)) if self.export_type.lower() == "drive": time.sleep(drive_wait) # It seems like there's often a race condition where EE reports export complete, but no files are found. Give things a short time to sync up. folder_search_path = os.path.join(str(self.drive_root_folder), str(self.export_folder)) download_images_in_folder(folder_search_path, self.output_folder, prefix=self.filename) elif self.export_type.lower() == "cloud": google_cloud.download_public_export(str(self.cloud_bucket), self.output_folder, f"{self.export_folder}/{self.filename}") else: raise ValueError("Unknown export_type (not one of 'drive', 'cloud') - can't download") self.task_data_downloaded = True if callback: callback_func = getattr(self, callback) callback_func()
[docs] def mosaic(self) -> None: """ Mosaics the individual pieces of the image into the complete image. EEDL works by configuring Earth Engine to tile large exports - while Earth Engine has limits on individual image sizes for export, it can slice large images into tiles so that each individual image fits within those limits. EEDL sets Earth Engine to tile exports, then tracks the individual pieces (whether one or thousands) through downloading. In this function, it then mosaics those pieces back into one image you can use locally, so you don't need to handle the individual tiles (and all of their edges). Returns: None """ self.mosaic_image = os.path.join(str(self.output_folder), f"{self.filename}_mosaic.tif") mosaic_rasters.mosaic_folder(str(self.output_folder), self.mosaic_image, prefix=self.filename)
[docs] def mosaic_and_zonal(self) -> None: """ A callback that takes no parameters, but runs both the mosaic and zonal stats methods. Runs zonal stats by allowing the user to set all the zonal params on the class instance instead of passing them as params. Users of EEDL *could* invoke this, but it's really designed to be invoked via a callback on the Task Registry, and for any prior code to configure the zonal statistics extractions. """ if not (self.zonal_polygons and self.zonal_keep_fields and self.zonal_stats_to_calc): raise ValueError("Can't run mosaic and zonal callback without `polygons`, `keep_fields, and `stats` values" "set on the class instance.") try: use_points = self.zonal_use_points except AttributeError: use_points = False self.mosaic() self.zonal_stats(polygons=self.zonal_polygons, keep_fields=self.zonal_keep_fields, stats=self.zonal_stats_to_calc, use_points=use_points, inject_constants=self.zonal_inject_constants, nodata_value=self.zonal_nodata_value, all_touched=self.zonal_all_touched, )
[docs] def zonal_stats(self, polygons: Union[str, Path], keep_fields: Tuple[str, ...] = ("UniqueID", "CLASS2"), stats: Tuple[str, ...] = ('min', 'max', 'mean', 'median', 'std', 'count', 'percentile_10', 'percentile_90'), report_threshold: int = 1000, write_batch_size: int = 2000, use_points: bool = False, inject_constants: Optional[dict] = None, nodata_value: int = -9999, all_touched: bool = False ) -> None: """ Args: polygons (Union[str, Path]): keep_fields (tuple[str, ...]): stats ( tuple[str, ...]): report_threshold (int): After how many iterations should it print out the feature number it's on. Defaults to 1000. Set to None to disable. write_batch_size (int): How many zones should we store up before writing to the disk? Defaults to 2000. use_points (bool): inject_constants(Optional[dict]): nodata_value (int): all_touched (bool): Returns: None """ if inject_constants is None: inject_constants = dict() self.zonal_output_filepath = zonal.zonal_stats( polygons, self.mosaic_image, self.output_folder, self.filename, keep_fields=keep_fields, stats=stats, report_threshold=report_threshold, write_batch_size=write_batch_size, use_points=use_points, inject_constants=inject_constants, nodata_value=nodata_value, all_touched=all_touched )
def _check_task_status(self) -> Dict[str, Union[Dict[str, str], bool]]: """ Updates the status if it needs to be changed Returns: Dict[str, Union[Dict[str, str], bool]]: Returns a dictionary of the most up-to-date status and whether that status was changed """ if self.task is None: raise ValueError('Error checking task status. Task is None. It likely means that the export task was not' ' properly created and the code needs to be re-run.') new_status = self.task.status() changed = False if self.last_task_status != new_status: changed = True self.last_task_status = new_status return {'status': self.last_task_status, 'changed': changed}