Available backends

Local watcher

Watcher for non-remote file systems.

Either using OS-based envents (like inotify on linux), or polling.

An example configuration file to retrieve data from a directory.

By default, files uris will not include any protocol, which means they will look like /tmp/myfile. If it is desired, the protocol setting in the fs_config can be set to “file” to make the uris look like file:///tmp/myfile.

It is also possible to make the local files sent as remote with the protocol and storage_options settings in the fs_config section. The generated uri can thus start with ssh://myhost for example, by setting protocol: ssh and storage_options: {host: “myhost”}.

backend: local
fs_config:
  directory: /data
  file pattern: "H-000-{orig_platform_name:4s}__-{orig_platform_name:4s}_{service:3s}____-{channel_name:_<9s}-        {segment:_<9s}-{start_time:%Y%m%d%H%M}-{compression:1s}_"
publisher_config:
  name: hrit_watcher
message_config:
  subject: /segment/hrit/l1b/
  atype: file

Moreover, it is possible to have the file pattern provided as a list in case multiple format are possible, eg:

backend: local
fs_config:
  directory: /data
  file pattern:
    - "H-000-{start_time:%Y%m%d%H%M}.nc"
    - "H-000-{start_time:%Y%m%d%H%M}.NC"
...
pytroll_watchers.local_watcher.file_generator(directory, observer_type='os', file_pattern=None, protocol=None, storage_options=None)

Generate new files appearing in the watched directory.

Parameters:
  • directory – The locally accessible directory to watch for changes.

  • observer_type – What to use for detecting changes. It can be either “os” for os-based detections (eg inotify on linux, but can be polling on some os’s), “polling” for detecting changes through polling, or the actual watchdog class to use as observer. Defaults to “os”.

  • file_pattern – The trollsift pattern to use for matching and extracting metadata from the filename. This can include a directory.

  • protocol (optional) – In case the file has to be advertised with another protocol than “file”.

  • storage_options – The storage options for the other protocol. Will be ignored if protocol is None.

Returns:

A tuple of Path or UPath and file metadata.

Examples

To iterate over new files in /tmp/:

>>> for filename in file_generator("/tmp/", file_pattern="{start_time:%Y%m%d_%H%M}_{product}.tif")
...    print(filename)
Path("/tmp/20200428_1000_foo.tif")

To get UPaths with ssh as protocol and a specific host:

>>> for filename in file_generator("/tmp/", file_pattern="{start_time:%Y%m%d_%H%M}_{product}.tif",
...                                protocol="ssh", storage_option=dict(host="myhost.pytroll.org"))
UPath("ssh:///tmp/20200428_1000_foo.tif")  # .storage_options will show the host.
pytroll_watchers.local_watcher.file_publisher(config)

Publish files coming from local filesystem events.

Parameters:
  • config – the configuration dictionary, containing in particular an fs_config section, which is the configuration

  • watching (for the filesystem)

  • passed (will be passed as argument to file_generator. The other sections are)

  • file_publisher_from_generator. (further to)

Minio bucket notification watcher

Publish messages based on Minio bucket notifications.

The published messages will contain filesystem information generated by fsspec.

pytroll_watchers.minio_notification_watcher.file_generator(endpoint_url, bucket_name, file_pattern=None, storage_options=None)

Generate new objects appearing in the watched bucket.

Parameters:
  • endpoint_url – The endpoint_url to use.

  • bucket_name – The bucket to watch for changes.

  • file_pattern – The trollsift pattern to use for matching and extracting metadata from the object name. This can include the prefix if needed.

  • storage_options – The storage options for the service, for example for specifying a profile to the aws config.

Returns:

A tuple of UPath and metadata.

Examples

To iterate over new files in s3:///tmp/:

>>> for filename in file_generator("some_endpoint_url", "tmp",
...                                file_pattern="data/{start_time:%Y%m%d_%H%M}_{product}.tif")
...    print(filename)
UPath("s3:///tmp/data/20200428_1000_foo.tif")
pytroll_watchers.minio_notification_watcher.file_publisher(config)

Publish objects coming from bucket notifications.

Parameters:
  • config – the configuration dictionary, containing in particular an fs_config section, which is the configuration

  • watching (for the filesystem)

  • passed (will be passed as argument to file_generator. The other sections are)

  • file_publisher_from_generator. (further to)

S3 bucket notification watcher

Poller for S3 object stores.

Configuration example:

backend: s3
fs_config:
  bucket_name: sat/L1B
  file_pattern: "SAT_{platform_name}-{start_time:%Y%m%d%H%M%S}_{end_time:%Y%m%d%H%M%S}.nc"
  storage_options:
    profile: sat-store
  polling_interval:
    seconds: 10
  start_from:
    days: 1
publisher_config:
  name: sat_watcher
  nameservers: false
  port: 3000
message_config:
  subject: /segment/s1/l1b/
  atype: file
data_config:
  include_dir_in_uid: true

in this example, the credentials and endpoint are provided through a profile that needs to be defined in the .aws files, for example, in .aws/credentials:

...
[sat-store]
aws_access_key_id=...
aws_secret_access_key=...

and in .aws/config:

...
[profile sat-store]
services = sat-store-s3

[services sat-store-s3]
S3 =
  endpoint_url = https://sat.store.org
pytroll_watchers.s3_poller.file_generator(bucket_name: str, polling_interval: timedelta | dict[str, float], file_pattern: str | None = None, start_from: timedelta | dict[str, float] | None = None, storage_options: None | dict[str, bool | int | str] = None) Generator[tuple[UPath, dict[str, Any]]]

Generate file UPaths and metadata for a given bucket by polling.

pytroll_watchers.s3_poller.file_publisher(config: dict[str, Any])

Publish files coming from local filesystem events.

Parameters:
  • config – the configuration dictionary, containing in particular an fs_config section, which is the configuration

  • watching (for the filesystem)

  • passed (will be passed as argument to file_generator. The other sections are)

  • file_publisher_from_generator. (further to)

Generate download links.

Generate download links since date.

Copernicus dataspace watcher

Module to provide file generator and publisher for the Copernicus dataspace contents.

It polls the catalogue using OData for new data (https://documentation.dataspace.copernicus.eu/APIs/OData.html) and generates locations for the data on the S3 services (https://documentation.dataspace.copernicus.eu/APIs/S3.html).

Note

The OData and S3 services require two different set of credentials. OData credentials can be passed as “username” and “password” in the configuration file, as “OAUTH_USERNAME” and “OAUTH_PASSWORD” in the environment or in a netrc file pointed to by the config’s “netrc_file” (optional for ~/.netrc) and “netrc_host”. S3 credentials can be passed through the standard AWS config files or environment variables.

Example of configuration file to retrieve SAR data from dataspace:

backend: dataspace
fs_config:
  filter_string: "contains(Name,'IW_GRDH')"
  dataspace_auth:
    netrc_host: catalogue.dataspace.copernicus.eu
  storage_options:
    profile: copernicus
  polling_interval:
    minutes: 10
  start_from:
    hours: 1
publisher_config:
  name: s1_watcher
  nameservers: false
  port: 3000
message_config:
  subject: /segment/s1/l1b/
  atype: file
  aliases:
    sensor:
      SAR: SAR-C
data_config:
  unpack:
    format: directory
    include_dir_in_uid: true
class pytroll_watchers.dataspace_watcher.CopernicusOAuth2Session(dataspace_auth)

An oauth2 session for copernicus dataspace.

fetch_token()

Fetch the token.

get(url)

Run a get request.

has_valid_token()

Do we have a valid token.

post(url, payload)

Post a payload.

pytroll_watchers.dataspace_watcher.file_generator(filter_string, polling_interval, dataspace_auth, start_from=None, storage_options=None)

Generate new objects by polling copernicus dataspace.

Parameters:
  • filter_string – the filter to use for narrowing the data to poll. For example, to poll level 1 olci data, it can be contains(Name,’OL_1_EFR’). For more information of the filter parameters, check: https://documentation.dataspace.copernicus.eu/APIs/OData.html

  • polling_interval – the interval (timedelta object or kwargs to timedelta) at which the dataspace will be polled.

  • dataspace_auth – the authentication information, as a dictionary. It can be a dictionary with username and password keys, or with netrc_host and optionaly netrc_file if the credentials are to be fetched with netrc.

  • start_from – how far back in time to fetch the data the first time. This is helpful for the first iteration of the generator, so that data from the past can be fetched, to fill a possible gap. Default to 0, meaning nothing older than when the generator starts will be fetched. Same format accepted as polling_interval.

  • storage_options – The options to pass the S3Path instance, usually include ways to get credentials to the copernicus object store, like profile from the .aws configuration files.

Yields:

Tuples of UPath (s3) and metadata.

pytroll_watchers.dataspace_watcher.file_publisher(config)

Publish files coming from local filesystem events.

Parameters:
  • config – the configuration dictionary, containing in particular an fs_config section, which is the configuration

  • watching (for the filesystem)

  • passed (will be passed as argument to file_generator. The other sections are)

  • file_publisher_from_generator. (further to)

Generate download links for a given filter_string.

Generate download links for data that was published since a given last publication_date.

Example

To fetch download link since yesterday, using netrc-stored credentials, and an AWS S3 profile:

>>> from pytroll_watchers.dataspace_watcher import generate_download_links_since
>>> filter_string = "contains(Name,'OL_1_EFR')"
>>> dataspace_auth = dict(netrc_host="dataspace.copernicus.eu")
>>> last_publication_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=24)
>>> storage_options = dict(profile="my_copernicus_s3_profile")
>>> generator = generate_download_links_since(filter_string, dataspace_auth, last_publication_date,
...                                           storage_options)
pytroll_watchers.dataspace_watcher.get_oauth(**dataspace_auth)

Get the OAuth’ed session.

EUMETSAT datastore watcher

Module to provide file generator and publisher for the EUMETSAT datastore contents.

It polls the catalogue using Opensearch for new data and generates locations for the data on https.

Note

The links produced can only be downloaded with a valid token. A token comes with the links, but has only a limited validity time (maybe 5 minutes).

Note

OAuth credentials can be passed as “username” and “password” in the configuration file, as “OAUTH_USERNAME” and “OAUTH_PASSWORD” in the environment or in a netrc file pointed to by the config’s “netrc_file” (optional for ~/.netrc) and “netrc_host”.

An example for getting links to MSG data:

from pytroll_watchers.datastore_watcher import generate_download_links_since

ds_auth = dict(netrc_host="api.eumetsat.int")

collection = "EO:EUM:DAT:MSG:HRSEVIRI"

search_params = dict(collection=collection)
now = datetime.datetime.now(datetime.timezone.utc)
yesterday = now - datetime.timedelta(hours=6)

links = list(generate_download_links_since(search_params, ds_auth, yesterday))

Another example, here a configuration file to pass to the CLI:

backend: datastore
fs_config:
  search_params:
    collection: "EO:EUM:DAT:0905"
  polling_interval:
    minutes: 5
  start_from:
    hours: 6
  ds_auth:
    netrc_host: api.eumetsat.int

publisher_config:
  name: <my watcher publisher name>
  nameservers: false
  port: <my port number>
message_config:
  subject: /my/datastore/watcher/topic
  atype: file
  data:
    sensor: aws
    platform_name: AWS1
    variant: GDS
final class pytroll_watchers.datastore_watcher.DatastoreOAuth2Session(datastore_auth)

An oauth2 session for eumetsat datastore.

fetch_token()

Fetch the token.

get(params)

Run a get request.

property token

Return the current token.

pytroll_watchers.datastore_watcher.file_generator(search_params, polling_interval, ds_auth, start_from=None) Generator[UPath, dict[str, Any]]

Search params must contain at least collection.

Parameters:
  • search_params – the dictionary of search parameters to request. Based on the OpenSearch API: https://user.eumetsat.int/api-definitions/data-store-opensearch-api

  • polling_interval – how often to poll for new data. Can be provided as a timedelta or a dictionary of arguments for timedelta.

  • ds_auth – either a dictionary with netrc_host (and optionally netrc_file), or a dictionary with username and password.

  • start_from – a timedelta or dictionary of arguments to timedelta to specify how far in time to start fetching data. None by default, which means the data will be no older that now.

pytroll_watchers.datastore_watcher.file_publisher(config: dict[str, Any])

Publish files coming from local filesystem events.

Parameters:
  • config – the configuration dictionary, containing in particular an fs_config section, which is the configuration

  • watching (for the filesystem)

  • passed (will be passed as argument to file_generator. The other sections are)

  • file_publisher_from_generator. (further to)

Generate download links provide search parameter and authentication.

Generate download links for data that was published since start_from.

DHuS watcher

Watcher for DHuS instances.

For more information about DHuS, check out https://sentineldatahub.github.io/DataHubSystem/about.html

An example configuration file to retrieve Sentinel 1 data from a DHuS instance:

backend: dhus
fs_config:
  server: https://myhub.someplace.org/
  filter_params:
      - substringof('IW_GRDH',Name)
  polling_interval:
      seconds: 10
  start_from:
      hours: 6
publisher_config:
  name: s1_watcher
message_config:
  subject: /segment/s1/l1b/
  atype: dataset
  aliases:
      sensor:
        SAR: SAR-C
data_config:
  unpack:
    format: zip
pytroll_watchers.dhus_watcher.file_generator(server, filter_params, polling_interval, start_from=None)

Generate new objects by polling a DHuS instance.

Parameters:
  • server – the DHuS server to use.

  • filter_params – the list of filter parameters to use for narrowing the data to poll. For example, to poll IW sar data, it can be substringof(‘IW_GRDH’,Name). For more information of the filter parameters, check: https://scihub.copernicus.eu/twiki/do/view/SciHubUserGuide/ODataAPI#filter

  • polling_interval – the interval (timedelta object or kwargs to timedelta) at which the DHUS will be polled.

  • start_from – how far back in time to fetch the data the first time. This is helpful for the first iteration of the generator, so that data from the past can be fetched, to fill a possible gap. Default to 0, meaning nothing older than when the generator starts will be fetched. Same format accepted as polling_interval.

Yields:

Tuples of UPath (http) and metadata.

Note

As this watcher uses requests, the authentication information should be stored in a .netrc file.

pytroll_watchers.dhus_watcher.file_publisher(config)

Publish files coming from local filesystem events.

Parameters:
  • config – the configuration dictionary, containing in particular an fs_config section, which is the configuration

  • watching (for the filesystem)

  • passed (will be passed as argument to file_generator. The other sections are)

  • file_publisher_from_generator. (further to)

Generate download links.

The filter params we can use are defined here: https://scihub.copernicus.eu/twiki/do/view/SciHubUserGuide/ODataAPI#filter

Generate download links for the data published since last_publication_date.

pytroll_watchers.dhus_watcher.read_gml(gml_string)

Read the gml string.

Adding a new backend

The base concept of the pytroll watchers library is very simply the publishing of file events. In order to add a new backend, two things need to be done:

  1. Implement a generator that iterates over filesystem events and generates a pair of (file item, file metadata). The file item is the the (U)Path object to the file (check out universal_pathlib). The file metadata is a dictionary that contains the metadata of the file. It can be formatted in the same fashion as the message config. If it does not contain a data key, it is expected to be the contents of the data key itself (ie subject and atype will not be used as message parameters, but rather as data items).

  2. Add an entry point to the new backend module. This module is expected to implement a file_publisher that just takes the config dictionary as argument.