Skip to content

dwdown

dwdown is a Python package designed to download weather forecast data from the Deutscher Wetterdienst (DWD), process it, and upload it to a Object Storage Server. It supports downloading forecast files, uploading them to Object Storage, and processing them for analysis. Furthermore it keeps you informed about the status of downloads, uploads, and any errors.

👉 Check out the GitHub repository!

Features

  • ForecastDownloader: Fetch weather forecast data from the DWD open data server.
  • HistoricalDownloader: Fetch historical observation data from the DWD open data server.
  • MOSMIXDownloader: Fetch MOSMIX forecast data from the DWD open data server.
  • OSDownloader: Download files from a S3 compatible / MinIO object storage server.
  • OSUploader: Upload downloaded data to a S3 compatible / MinIO object storage server with parallel uploads and data integrity checks.
  • GribFileManager: Extract BZ2 archives and convert GRIB2 files to CSV format.
  • DataMerger: Filter and merge CSV dataframes.
  • Notifier: Receive status messages of downloads, uploads, and any errors from a Gotify server.
  • Logging: Automatically log download and upload activities, and handle errors gracefully.
  • Parallel Processing: Download, upload, and process files in parallel for faster performance.

Installation

You can install dwdown via pip:

git clone https://github.com/trholy/dwdown.git 

pip install .

Documentation

Read the documentation on GitLab Pages.

Usage

HistoricalDownloader: Fetch historical observation data from the DWD open data server.

from dwdown.download import HistoricalDownloader


# Initialize HistoricalDownloader
scraper = HistoricalDownloader(
    base_url=None,                       # Base URL for historical data (to be set)
    files_path=None,                     # Path for downloaded files
    extracted_files_path=None,           # Path for extracted files
    log_files_path="log_files",          # Path for log files
    encoding=None,                       # File encoding (to be set)
    station_description_file_name=None,  # Station description filename
    delay=1,                             # 1 second delay between downloads
    retry=0,                             # Don't retry failed downloads
    timeout=30                           # 30 second timeout for requests
)

# Download station descriptions
scraper.download_station_description()

# Read station descriptions
station_descriptions = scraper.read_station_description()
print(station_descriptions)

# Get download links for specific stations
links = scraper.get_links(
    station_ids=['00001','00003'],   # Get links for stations 1 and 3
    prefix="tageswerte_KL",          # File prefix for daily weather data
    suffix="_hist.zip"               # File suffix for historical zip files
)
print(links)

# Download files
scraper.download(check_for_existence=True)

# Unpack ZIP files
scraper.extract(unpack_hist_data_only=True, check_for_existence=True)

# Read and save data as CSV
df = scraper.read_data(save_as_csv=True)
print(df)

MOSMIXDownloader: Fetch MOSMIX forecast data from the DWD open data server.

from dwdown.download import MOSMIXDownloader


# Initialize MOSMIXDownloader
scraper = MOSMIXDownloader(
    mosmix_type="MOSMIX_L",              
    base_url=None,                       # Base URL constructed automatically based on type
    files_path=None,                     # Path for downloaded files (defaults to download_files)
    extracted_files_path=None,           # Path for extracted files (defaults to extracted_files)
    log_files_path="log_files",          # Path for log files
    delay=1,                             # 1 second delay between downloads
    retry=0,                             # Don't retry failed downloads
    timeout=30                           # 30 second timeout for requests
)

# Get download links for specific stations
links = scraper.get_links(
    station_ids=['01001']
)
print(f"Found {len(links)} links:", links)

# Download files
scraper.download(check_for_existence=True)

# Unpack KMZ files
scraper.extract(check_for_existence=True)

# Read and save data as CSV
data = scraper.read_data(save_as_csv=True)

# Print result summary
if data:
    for filename, df in data.items():
        print(f"Processed {filename}:")
        print(df.head())
else:
    print("No data processed.")

ForecastDownloader: Fetch weather forecast data from the DWD open data server.

The ForecastDownloader class allows you to download weather forecast files from the DWD open data server.

from dwdown.download import ForecastDownloader


variables = [
    'aswdifd_s',
    'relhum',
    'smi',
]

for variable in variables:

    # Initialize ForecastDownloader
    dwd_downloader = ForecastDownloader(
        url=f"https://opendata.dwd.de/weather/nwp/icon-d2/grib/09/{variable}/",
        retry=0,  # Dont retry failed downloads (formerly restart_failed_downloads)
        delay=0.1,  # 0.1 seconds delay between downloads
        n_jobs=4,  # Use 4 concurrent workers (formerly workers)
        files_path=f"download_files/09/{variable}",  # Path for downloaded files (formerly download_path)
        log_files_path="log_files"  # Path for log files
    )

    # Fetch download links
    dwd_downloader.get_links(exclude_pattern=["icosahedral"])

    # Download files
    dwd_downloader.download(check_for_existence=True)

    # Print status after download
    print("Successfully downloaded files:", dwd_downloader.downloaded_files)
    print("Failed downloads:", dwd_downloader.failed_files)

OSUploader: Upload Data to Object Storage

The OSUploader class helps upload files to a MinIO object storage server or any S3-compatible storage, ensuring data integrity with MD5 hash verification.

from dwdown.upload import OSUploader


# Initialize OSUploader
uploader = OSUploader(
    endpoint="your-minio-sever.com",
    access_key="your-access-key",
    secret_key="your-secret-key",
    files_path="download_files",  # Path for files to upload
    bucket_name="weather-forecasts",  # Name of the minio bucket
    secure=False,  # If "true" API requests will be secure (HTTPS), and insecure (HTTP) otherwise
    log_files_path="log_files",  # Path for log files
    n_jobs=4  # Use 4 concurrent workers
)

# Upload files to MinIO
uploader.upload()
uploader.delete()

# Print status after upload
print("Successfully uploaded files:", uploader.uploaded_files)
print("Upload might be corrupted:", uploader.corrupted_files)

OSDownloader: Download Data from Object Storage

The OSDownloader class helps you download files from a MinIO object storage server or any S3-compatible storage.

from dwdown.download import OSDownloader


# Initialize OSDownloader
minio_downloader = OSDownloader(
    endpoint="your-minio-sever.com",
    access_key="your-access-key",
    secret_key="your-secret-key",
    files_path="download_files",  # Path for files to download
    bucket_name="weather-forecasts",  # Name of the minio bucket
    secure=False,  # If "true" API requests will be secure (HTTPS), and insecure (HTTP) otherwise
    log_files_path="log_files",  # Path for log files
    n_jobs=4  # Use 4 concurrent workers
)

# Download files from MinIO
minio_downloader.download()

# Print status after upload
print("Successfully downloaded files:", minio_downloader.downloaded_files)
print("Download might be corrupted:", minio_downloader.corrupted_files)

GribFileManager & DataMerger: Process and Merge Data

The GribFileManager handles decompression and conversion of GRIB2 files, while DataMerger (formerly DataEditor) allows for merging and filtering CSV dataframes.

from dwdown.processing import DataMerger, GribFileManager


# Initialize the GribFileManager (formerly DataProcessor)
processor = GribFileManager(
    files_path="download_files",  # Path for files to process (formerly search_path)
    extracted_files_path="extracted_files",  # Path for extracted files (formerly extraction_path)
    converted_files_path="csv_files",  # Path for CSV files
)

# Retrieve the filenames that have been downloaded
file_names = processor.get_filenames()

# Convert downloaded files into CSV format
processor.get_csv(
    file_names=file_names,
    apply_geo_filtering=True,
    start_lat=50.840,
    end_lat=51.000,
    start_lon=11.470,
    end_lon=11.690,
)

# Variables to build merged dataframe from
variables = [
    'aswdifd_s',
    'relhum',
    'smi',
]

# External mapping dictionary
mapping_dictionary = {
    'aswdifd_s': 'ASWDIFD_S',
    'relhum': 'r',
    'smi': 'SMI',
}

# Pattern selection for known variables
additional_patterns = {
    "relhum": [200, 975, 1000],
    "smi": [0, 9, 27],
}

# Initialize DataMerger (formerly DataEditor)
data_editor = DataMerger(
    files_path='csv_files/09/',
    required_columns={
        'latitude', 'longitude', 'valid_time'
    },
    join_method='inner',
    mapping_dictionary=mapping_dictionary,
    additional_patterns=additional_patterns, # formerly additional_pattern_selection
)

df = data_editor.merge(
    time_step=0,
    variables=variables
)
print("Processed DataFrame:", df)

df.to_csv('processed_dataframe.csv')

Notifier: Send Status Updates

The Notifier class keeps you informed about the status of downloads, uploads, and any errors via a Gotify server.

from minio import Minio
from dwdown.notify import Notifier


# Initialize Notifier
notifier = Notifier(
    server_url="your-gotify-sever.com",
    token="your-access-token",
    priority=5,
    secure=False  # Set to True if your MinIO server is HTTPS
)

# Initialize minio client
minio_client = Minio(
    endpoint="your-minio-sever.com",
    access_key="your-access-key",
    secret_key="your-secret-key",
    secure=False  # Set to True if your MinIO server is HTTPS
)

# List all buckets
buckets = minio_client.list_buckets()

status_dict = {}

for bucket in buckets:
    bucket_name = bucket.name
    print(f"Processing bucket: {bucket_name}")

    # List all objects in the bucket
    objects = minio_client.list_objects(bucket_name, recursive=True)

    # Get number of objects in the bucket
    status_dict[bucket_name] = [len([obj.object_name for obj in objects])]

# Send notification
notifier.send_notification(
    message=status_dict,
    script_name="download-VM"
)

Directory Structure

The package structure is as follows:

./
├── .git
├── .gitignore
├── .gitlab-ci.yml
├── LICENSE
├── README.md
├── THIRD_PARTY_LICENSES.txt
├── docs
│   ├── data
│   │   └── MappingStore.md
│   ├── download
│   │   ├── ForecastDownloader.md
│   │   ├── HistoricalDownloader.md
│   │   ├── MosmixDownloader.md
│   │   └── OSDownloader.md
│   ├── notify
│   │   └── Notifier.md
│   ├── processing
│   │   ├── DataMerger.md
│   │   └── GribFileManager.md
│   ├── upload
│   │   └── OSUploader.md
│   └── utils
│       ├── DataFrameOperator.md
│       ├── DateTimeUtils.md
│       ├── FileHandler.md
│       ├── LogHandler.md
│       ├── NetworkHandlers.md
│       ├── OSHandler.md
│       └── Utilities.md
├── example_usage
│   ├── 00_dwd_forecast_scraper.py
│   ├── 00b_dwd_hist-station-data_scraper.py
│   ├── 00c_dwd_mosmix_scraper.py
│   ├── 01_os_uploader.py
│   ├── 02_os_downloader.py
│   ├── 03_data_processing.py
├── img
│   └── example_workflow.png
├── mkdocs.yml
├── pyproject.toml
├── setup.py
├── src
│   └── dwdown
│       ├── __init__.py
│       ├── data
│       │   ├── __init__.py
│       │   └── mapping.py
│       ├── download
│       │   ├── __init__.py
│       │   ├── forecast_download.py
│       │   ├── historical_download.py
│       │   ├── mosmix_download.py
│       │   └── os_download.py
│       ├── notify
│       │   ├── __init__.py
│       │   └── notifier.py
│       ├── processing
│       │   ├── __init__.py
│       │   ├── data_merging.py
│       │   └── grib_data_handling.py
│       ├── upload
│       │   ├── __init__.py
│       │   └── os_upload.py
│       └── utils
│           ├── __init__.py
│           ├── date_time_utilis.py
│           ├── df_utilis.py
│           ├── file_handling.py
│           ├── general_utilis.py
│           ├── log_handling.py
│           ├── network_handling.py
│           └── os_handling.py
└── tests
    ├── test_ForecastDownloader.py
    ├── test_HistoricalDownloader.py
    ├── test_MOSMIXDownloader.py
    ├── test_OSDownloader.py
    ├── test_OSUploader.py
    ├── test_date_time_utilis.py
    ├── test_file_handling.py
    ├── test_log_handling.py
    ├── test_mapping.py
    ├── test_network_handling.py
    ├── test_notifier.py
    ├── test_processing.py
    └── test_utils.py

License

This project is licensed under the MIT License. See the LICENSE file for more details.

Authors & Maintainers

  • Thomas R. Holy, Ernst-Abbe-Hochschule Jena

Contributing

Feel free to contribute to the development of dwdown!