Welcome to pytroll-watchers’s documentation!

Pytroll-watcher is a library and command-line tool to detect changes on a local or remote file system.

At the moment we support local filesystems and Minio S3 buckets through bucket notifications.

CLI

The command-line tool can be used by invoking pytroll-watcher <config-file>. An example config-file can be:

backend: minio
fs_config:
  endpoint_url: my_endpoint.pytroll.org
  bucket_name: satellite-data-viirs
  storage_options:
    profile: profile_for_credentials
publisher_config:
  name: viirs_watcher
message_config:
  subject: /segment/viirs/l1b/
  atype: file
  data:
    sensor: viirs
  aliases:
    platform_name:
      npp: Suomi-NPP

Published messages

The published messages will contain information on how to access the resource advertized. The following parameters will be present in the message.

uid

This is the unique identifier for the resource. In general, it is the basename for the file/objects, since we assume that two files with the same name will have the same content. In some cases it can include the containing directory.

Examples of uids:

  • SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5

  • S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3/Oa02_radiances.nc

uri

This is the URI that can be used to access the resource. The URI can be composed as fsspec allows for more complex cases.

Examples of uris:

  • s3://viirs-data/sdr/SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5

  • zip://sdr/SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5::s3://viirs-data/viirs_sdr_npp_d20240408_t1006227_e1007469_b64498.zip

  • https://someplace.com/files/S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3/Oa02_radiances.nc

Sometimes the URI is not enough to gain access to the resource, for example when the hosting service requires authentification. This is why pytroll-watchers with also provide the filesystem and the path items. The filesystem parameter is the fsspec json representation of the filesystem. This can be used on the recipient side using eg:

fsspec.AbstractFileSystem.from_json(json.dumps(fs_info))

where fs_info is the content of the filesystem parameter.

To pass authentification parameters to the filesystem, use the storage_options configuration item.

Example of filesystem:

  • {“cls”: “s3fs.core.S3FileSystem”, “protocol”: “s3”, “args”: [], “profile”: “someprofile”}

Warning

Pytroll-watchers tries to prevent publishing of sensitive information such as passwords and secret keys, and will raise an error in most cases when this is done. However, always double-check your pytroll-watchers configuration so that secrets are not passed to the library to start with. Solutions include ssh-agent for ssh-based filesystems, storing credentials in .aws config files for s3 filesystems. For http-based filesystems implemented in pytroll-watchers, the username and password are used to generate a token prior to publishing, and will thus not be published.

path

This parameter is the companion to filesystem and gives the path to the resource within the filesystem.

Examples of paths:

  • /viirs-data/sdr/SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5

  • /sdr/SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5

  • /files/S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3/Oa02_radiances.nc

API

Main interface

Main package file for pytroll watchers.

Local watcher

Watcher for non-remote file systems.

Either using OS-based envents (like inotify on linux), or polling.

pytroll_watchers.local_watcher.file_generator(directory, observer_type='os', file_pattern=None, protocol=None, storage_options=None)

Generate new files appearing in the watched directory.

Parameters:
  • directory – The locally accessible directory to watch for changes.

  • observer_type – What to use for detecting changes. It can be either “os” for os-based detections (eg inotify on linux, but can be polling on some os’s), “polling” for detecting changes through polling, or the actual watchdog class to use as observer. Defaults to “os”.

  • file_pattern – The trollsift pattern to use for matching and extracting metadata from the filename. This must not include the directory.

  • protocol (optional) – In case the file has to be advertised with another protocol than “file”.

  • storage_options – The storage options for the other protocol. Will be ignored if protocol is None.

Returns:

A tuple of Path or UPath and file metadata.

Examples

To iterate over new files in /tmp/:

>>> for filename in file_generator("/tmp/", file_pattern="{start_time:%Y%m%d_%H%M}_{product}.tif")
...    print(filename)
Path("/tmp/20200428_1000_foo.tif")

To get UPaths with ssh as protocol and a specific host:

>>> for filename in file_generator("/tmp/", file_pattern="{start_time:%Y%m%d_%H%M}_{product}.tif",
...                                protocol="ssh", storage_option=dict(host="myhost.pytroll.org"))
UPath("ssh:///tmp/20200428_1000_foo.tif")  # .storage_options will show the host.
pytroll_watchers.local_watcher.file_publisher(fs_config, publisher_config, message_config)

Publish files coming from local filesystem events.

Parameters:
  • fs_config – the configuration for the filesystem watching, will be passed as argument to file_generator.

  • publisher_config – The configuration dictionary to pass to the posttroll publishing functions.

  • message_config – The information needed to complete the posttroll message generation. Will be amended with the file metadata, and passed directly to posttroll’s Message constructor.

Minio bucket notification watcher

Publish messages based on Minio bucket notifications.

The published messages will contain filesystem information generated by fsspec.

pytroll_watchers.minio_notification_watcher.file_generator(endpoint_url, bucket_name, file_pattern=None, storage_options=None)

Generate new objects appearing in the watched bucket.

Parameters:
  • endpoint_url – The endpoint_url to use.

  • bucket_name – The bucket to watch for changes.

  • file_pattern – The trollsift pattern to use for matching and extracting metadata from the object name. This must not include the prefix.

  • storage_options – The storage options for the service, for example for specifying a profile to the aws config.

Returns:

A tuple of UPath and metadata.

Examples

To iterate over new files in s3:///tmp/:

>>> for filename in file_generator("some_endpoint_url", "tmp",
...                                file_pattern="{start_time:%Y%m%d_%H%M}_{product}.tif")
...    print(filename)
UPath("s3:///tmp/20200428_1000_foo.tif")
pytroll_watchers.minio_notification_watcher.file_publisher(fs_config, publisher_config, message_config)

Publish objects coming from bucket notifications.

Parameters:
  • fs_config – the configuration for the filesystem watching, will be passed as argument to file_generator.

  • publisher_config – The configuration dictionary to pass to the posttroll publishing functions.

  • message_config – The information needed to complete the posttroll message generation. Will be amended with the file metadata, and passed directly to posttroll’s Message constructor.

Copernicus dataspace watcher

Module to provide file generator and publisher for the Copernicus dataspace contents.

It polls the catalogue using OData for new data (https://documentation.dataspace.copernicus.eu/APIs/OData.html) and generates locations for the data on the S3 services (https://documentation.dataspace.copernicus.eu/APIs/S3.html).

Note

The OData and S3 services require two different set of credentials.

class pytroll_watchers.dataspace_watcher.CopernicusOAuth2Session(dataspace_auth)

An oauth2 session for copernicus dataspace.

fetch_token()

Fetch the token.

get(filter_string)

Run a get request.

pytroll_watchers.dataspace_watcher.file_generator(filter_string, polling_interval, dataspace_auth, start_from=None, storage_options=None)

Generate new objects by polling copernicus dataspace.

Parameters:
  • filter_string – the filter to use for narrowing the data to poll. For example, to poll level 1 olci data, it can be contains(Name,’OL_1_EFR’). For more information of the filter parameters, check: https://documentation.dataspace.copernicus.eu/APIs/OData.html

  • polling_interval – the interval (timedelta object or kwargs to timedelta) at which the dataspace will be polled.

  • dataspace_auth – the authentication information, as a dictionary. It can be a dictionary with username and password keys, or with netrc_host and optionaly netrc_file if the credentials are to be fetched with netrc.

  • start_from – how far back in time to fetch the data the first time. This is helpful for the first iteration of the generator, so that data from the past can be fetched, to fill a possible gap. Default to 0, meaning nothing older than when the generator starts will be fetched. Same format accepted as polling_interval.

  • storage_options – The options to pass the S3Path instance, usually include ways to get credentials to the copernicus object store, like profile from the .aws configuration files.

Yields:

Tuples of UPath (s3) and metadata.

pytroll_watchers.dataspace_watcher.file_publisher(fs_config, publisher_config, message_config)

Publish files coming from local filesystem events.

Parameters:
  • fs_config – the configuration for the filesystem watching, will be passed as argument to file_generator.

  • publisher_config – The configuration dictionary to pass to the posttroll publishing functions.

  • message_config – The information needed to complete the posttroll message generation. Will be amended with the file metadata, and passed directly to posttroll’s Message constructor.

Generate download links for a given filter_string.

Generate download links for data that was published since a given last publication_date.

Example

To fetch download link since yesterday, using netrc-stored credentials, and an aws s3 profile:

>>> from pytroll_watchers.dataspace_watcher import generate_download_links_since
>>> filter_string = "contains(Name,'OL_1_EFR')"
>>> dataspace_auth = dict(netrc_host="dataspace.copernicus.eu")
>>> last_publication_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=24)
>>> storage_options = dict(profile="my_copernicus_s3_profile")
>>> generator = generate_download_links_since(filter_string, dataspace_auth, last_publication_date,
...                                           storage_options)
pytroll_watchers.dataspace_watcher.run_every(interval)

Generator that ticks every interval.

Parameters:
  • interval – the timedelta object giving the amount of time to wait between ticks. An interval of 0 will just make

  • once (tick)

  • return (then)

Yields:

The time of the next tick.

pytroll_watchers.dataspace_watcher.update_last_publication_date(last_publication_date, metadata)

Update the last publication data based on the metadata.

Testing utilities

Pytest fixtures and utilities for testing code that uses pytroll watchers.

pytroll_watchers.testing.load_oauth_responses(*responses_to_load, response_file=None)

Load the oauth responses for mocking the requests to copernicus dataspace.

Parameters:
  • responses_to_load – The responses to load.

  • response_file – The file where the responses are stored. Defaults to tests/dataspace_responses.yaml

Example

To get fake response for the watcher and test the generator, one could use:

with load_oauth_responses("token", "filtered_yesterday"):
    files = list(file_generator(filter_string, check_interval, timedelta(hours=24)))
pytroll_watchers.testing.patched_bucket_listener(monkeypatch)

Patch the records produced by the underlying bucket listener.

Example

This context manager can be used like this:

>>> with patched_bucket_listener(records_to_produce):
...     for record in bucket_notification_watcher.file_generator(endpoint, bucket):
...         # do something with the record
pytroll_watchers.testing.patched_local_events(monkeypatch)

Patch the events produced by underlying os/polling watcher.

Example

The produced context managed can be used like this:

>>> with patched_local_events(["/tmp/file1", "/tmp/file2"]):
...    assert "/tmp/file1" in local_watcher.file_generator("/tmp")

Indices and tables