{ "cells": [ { "cell_type": "markdown", "id": "d9a0259d-b56b-429c-9a76-7316829b8b63", "metadata": {}, "source": [ "# ERA5 EVAP from AWS: Virtualizarr & Icechunk" ] }, { "cell_type": "code", "execution_count": 1, "id": "096be375-3c98-429f-843e-b53f94049da6", "metadata": {}, "outputs": [], "source": [ "import warnings\n", "warnings.filterwarnings(\"ignore\", category=UserWarning)" ] }, { "cell_type": "code", "execution_count": 2, "id": "03fae159-4235-4312-bc31-00a1df894a39", "metadata": {}, "outputs": [], "source": [ "import icechunk" ] }, { "cell_type": "code", "execution_count": 3, "id": "fa06c4b3-7c50-4b65-b8bc-caca5522cb7f", "metadata": {}, "outputs": [], "source": [ "import xarray as xr\n", "from obstore.store import from_url\n", "\n", "from virtualizarr import open_virtual_dataset\n", "from virtualizarr.parsers import HDFParser\n", "from virtualizarr.registry import ObjectStoreRegistry" ] }, { "cell_type": "code", "execution_count": 4, "id": "5c93f342-4239-4557-ba48-f12eff688a1f", "metadata": {}, "outputs": [], "source": [ "data_bucket = \"s3://nsf-ncar-era5\"" ] }, { "cell_type": "code", "execution_count": 5, "id": "43d26f75-92ab-4313-baad-fc49c1874fb3", "metadata": {}, "outputs": [], "source": [ "import os\n", "from dotenv import load_dotenv\n", "_ = load_dotenv(f'{os.environ['HOME']}/dotenv/rsignell4.env')\n", "\n", "# Define Icechunk storage\n", "storage_endpoint = 'https://pangeo-eosc-minioapi.vm.fedcloud.eu'\n", "storage_bucket = 'rsignell4-protocoast'\n", "storage_name = 'era5-evap-icechunk'" ] }, { "cell_type": "code", "execution_count": 6, "id": "86456d60-5658-47e6-ba9d-ad6cf794add2", "metadata": {}, "outputs": [], "source": [ "storage = icechunk.s3_storage(\n", " bucket=storage_bucket,\n", " prefix=f\"icechunk/{storage_name}\",\n", " from_env=True,\n", " endpoint_url=storage_endpoint,\n", " region='not-used', # N/A for Pangeo-EOSC bucket, but required param\n", " force_path_style=True)" ] }, { "cell_type": "code", "execution_count": 7, "id": "16df58d3-97cb-4a84-b079-6e8d2164c271", "metadata": {}, "outputs": [], "source": [ "config = icechunk.RepositoryConfig.default()\n", "config.set_virtual_chunk_container(\n", " icechunk.VirtualChunkContainer(\n", " url_prefix=f\"{data_bucket}/\",\n", " store=icechunk.s3_store(region=\"us-west-2\", anonymous=True, s3_compatible=True, \n", " force_path_style=True),\n", " ),\n", ")" ] }, { "cell_type": "code", "execution_count": 8, "id": "e5478c68-745e-423c-ac1e-eedf997bc992", "metadata": {}, "outputs": [], "source": [ "import fsspec\n", "fs = fsspec.filesystem('s3', anon=True)\n", "fs_write = fsspec.filesystem('s3', anon=False, endpoint_url=storage_endpoint)" ] }, { "cell_type": "code", "execution_count": 9, "id": "cafaa143-8260-4fd2-90cf-d9c0a4bfae31", "metadata": {}, "outputs": [], "source": [ "mon_list = fs.ls(f'{data_bucket}/e5.oper.fc.sfc.accumu/')[1:]" ] }, { "cell_type": "code", "execution_count": 10, "id": "6aba3edb-f5df-48d3-ad30-5bf7eccbcfd6", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "nsf-ncar-era5/e5.oper.fc.sfc.accumu/194001\n", "nsf-ncar-era5/e5.oper.fc.sfc.accumu/202506\n" ] } ], "source": [ "print(mon_list[0])\n", "print(mon_list[-1])" ] }, { "cell_type": "code", "execution_count": 11, "id": "865edd39-32f4-4dcc-b556-44c0789d0846", "metadata": {}, "outputs": [], "source": [ "store = from_url(data_bucket, region=\"us-west-2\", skip_signature=True)\n", "registry = ObjectStoreRegistry({data_bucket: store})\n", "parser = HDFParser()" ] }, { "cell_type": "code", "execution_count": 12, "id": "3a9a2b79-6806-40c9-8455-e0d5f275a6b3", "metadata": {}, "outputs": [], "source": [ "credentials = icechunk.containers_credentials({f\"s3://{data_bucket}/\": icechunk.s3_credentials(anonymous=True)})" ] }, { "cell_type": "code", "execution_count": 13, "id": "77badac4-362e-4501-b669-24c6d986bb69", "metadata": {}, "outputs": [], "source": [ "def fix_ds(ds):\n", " return ds.drop_vars(['utc_date']) # <<= This 'utc_date' was making xr.concat bomb out, so we drop it" ] }, { "cell_type": "code", "execution_count": 14, "id": "76d8f197-9e6e-430f-a1a4-9ce04666b778", "metadata": {}, "outputs": [], "source": [ "def create_or_append(flist, create=False):\n", " flist = [f's3://{f}' for f in flist]\n", " if create:\n", " # remove old existing icechunk storage with this name\n", " try:\n", " # Use the same prefix as the storage\n", " fs_write.rm(f's3://{storage_bucket}/icechunk/{storage_name}', recursive=True)\n", " print('removing old icechunk storage')\n", " except:\n", " pass\n", "\n", " ds_list = [\n", " open_virtual_dataset(\n", " url=f,\n", " parser=parser,\n", " registry=registry, \n", " loadable_variables=[\"forecast_initial_time\"]) for f in flist]\n", "\n", " ds_list = [fix_ds(ds) for ds in ds_list]\n", "\n", " ds = xr.concat(\n", " ds_list,\n", " dim=\"forecast_initial_time\",\n", " coords=\"minimal\",\n", " compat=\"override\",\n", " combine_attrs=\"override\",\n", " )\n", " if create:\n", " repo = icechunk.Repository.create(storage, config)\n", " session = repo.writable_session(\"main\")\n", " ds.virtualize.to_icechunk(session.store)\n", " session.commit(\"Initial ERA5 Evap creation\")\n", " else:\n", " repo = icechunk.Repository.open(storage, config, authorize_virtual_chunk_access=credentials)\n", " append_session = repo.writable_session(\"main\")\n", " ds.virtualize.to_icechunk(append_session.store, append_dim=\"forecast_initial_time\")\n", " append_session.commit(\"Append more ERA5 Evap Files\")" ] }, { "cell_type": "markdown", "id": "6409a6e0-4fbc-4d8d-a08b-ad32256df705", "metadata": {}, "source": [ "## Create the icechunk using mon_list[0]" ] }, { "cell_type": "code", "execution_count": 15, "id": "51b7bfb5-9b86-49cc-8f38-235f6ce96454", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['nsf-ncar-era5/e5.oper.fc.sfc.accumu/194001/e5.oper.fc.sfc.accumu.128_182_e.ll025sc.1940010106_1940011606.nc',\n", " 'nsf-ncar-era5/e5.oper.fc.sfc.accumu/194001/e5.oper.fc.sfc.accumu.128_182_e.ll025sc.1940011606_1940020106.nc']" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "flist = fs.glob(f'{mon_list[0]}/*128_182_e*.nc')\n", "create_or_append(flist, create=True)" ] }, { "cell_type": "markdown", "id": "94aa8d1e-8f40-41b9-9dab-0c72e6e22cc3", "metadata": {}, "source": [ "## Append all the other months" ] }, { "cell_type": "code", "execution_count": null, "id": "d11bfa53-442f-447f-9c37-ce93842834c3", "metadata": {}, "outputs": [], "source": [ "%%time\n", "for mon in mon_list[1:]: # <== starting at mon_list[1] because we used mon_list[0] to create\n", " print(mon)\n", " flist = fs.glob(f'{mon}/*128_182_e*.nc')\n", " create_or_append(flist, create=False)" ] }, { "cell_type": "markdown", "id": "5c951f69-7734-4983-aadf-39f8e4322124", "metadata": {}, "source": [ "## Check that it worked" ] }, { "cell_type": "code", "execution_count": 18, "id": "72b63d0e-0311-4686-b315-292126ccbc14", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
<xarray.Dataset> Size: 9GB\n",
"Dimensions: (forecast_initial_time: 182, forecast_hour: 12,\n",
" latitude: 721, longitude: 1440)\n",
"Coordinates:\n",
" * forecast_initial_time (forecast_initial_time) datetime64[ns] 1kB 1940-01...\n",
" * forecast_hour (forecast_hour) int32 48B 1 2 3 4 5 6 7 8 9 10 11 12\n",
" * latitude (latitude) float64 6kB 90.0 89.75 ... -89.75 -90.0\n",
" * longitude (longitude) float64 12kB 0.0 0.25 0.5 ... 359.5 359.8\n",
"Data variables:\n",
" E (forecast_initial_time, forecast_hour, latitude, longitude) float32 9GB dask.array<chunksize=(5, 12, 721, 1440), meta=np.ndarray>\n",
"Attributes:\n",
" DATA_SOURCE: ECMWF: https://cds.climate.copernicus.eu, Copernicu...\n",
" NETCDF_CONVERSION: CISL RDA: Conversion from ECMWF GRIB1 data to netCDF4.\n",
" NETCDF_VERSION: 4.8.1\n",
" CONVERSION_PLATFORM: Linux r4i0n8 4.12.14-95.51-default #1 SMP Fri Apr 1...\n",
" CONVERSION_DATE: Fri Mar 17 12:52:09 MDT 2023\n",
" Conventions: CF-1.6\n",
" NETCDF_COMPRESSION: NCO: Precision-preserving compression to netCDF4/HD...\n",
" history: Fri Mar 17 12:52:18 2023: ncks -4 --ppc default=7 e...\n",
" NCO: netCDF Operators version 5.0.3 (Homepage = http://n...