CWL CommandLineTool for stage-out
Below a stage-out.cwl
CWL (Common Workflow Language) document for a command-line tool that executes a Python script.
The document is designed to use a container and execute a Python script named "stage.py":
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133 | cwlVersion: v1.0
class: CommandLineTool
id: stage-out
doc: "Stage-out the results to S3"
inputs:
s3_bucket:
type: string
sub_path:
type: string
aws_access_key_id:
type: string
aws_secret_access_key:
type: string
region_name:
type: string
endpoint_url:
type: string
stac_catalog:
type: Directory
outputs:
s3_catalog_output:
outputBinding:
outputEval: ${ return "s3://" + inputs.s3_bucket + "/" + inputs.sub_path + "/catalog.json"; }
type: string
baseCommand:
- python
- stage.py
arguments:
- $( inputs.stac_catalog.path )
- $( inputs.s3_bucket )
- $( inputs.sub_path )
requirements:
DockerRequirement:
dockerPull: ghcr.io/terradue/ogc-eo-application-package-hands-on/stage:1.3.2
InlineJavascriptRequirement: {}
EnvVarRequirement:
envDef:
aws_access_key_id: $( inputs.aws_access_key_id )
aws_secret_access_key: $( inputs.aws_secret_access_key )
aws_region_name: $( inputs.region_name )
aws_endpoint_url: $( inputs.endpoint_url )
ResourceRequirement: {}
InitialWorkDirRequirement:
listing:
- entryname: stage.py
entry: |-
import os
import sys
import pystac
import botocore
import boto3
import shutil
from pystac.stac_io import DefaultStacIO, StacIO
from urllib.parse import urlparse
cat_url = sys.argv[1]
bucket = sys.argv[2]
subfolder = sys.argv[3]
aws_access_key_id = os.environ["aws_access_key_id"]
aws_secret_access_key = os.environ["aws_secret_access_key"]
region_name = os.environ["aws_region_name"]
endpoint_url = os.environ["aws_endpoint_url"]
shutil.copytree(cat_url, "/tmp/catalog")
cat = pystac.read_file(os.path.join("/tmp/catalog", "catalog.json"))
class CustomStacIO(DefaultStacIO):
"""Custom STAC IO class that uses boto3 to read from S3."""
def __init__(self):
self.session = botocore.session.Session()
self.s3_client = self.session.create_client(
service_name="s3",
use_ssl=True,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
endpoint_url=endpoint_url,
region_name=region_name,
)
def write_text(self, dest, txt, *args, **kwargs):
parsed = urlparse(dest)
if parsed.scheme == "s3":
self.s3_client.put_object(
Body=txt.encode("UTF-8"),
Bucket=parsed.netloc,
Key=parsed.path[1:],
ContentType="application/geo+json",
)
else:
super().write_text(dest, txt, *args, **kwargs)
client = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
endpoint_url=endpoint_url,
region_name=region_name,
)
StacIO.set_default(CustomStacIO)
for item in cat.get_items():
for key, asset in item.get_assets().items():
s3_path = os.path.normpath(
os.path.join(os.path.join(subfolder, item.id, asset.href))
)
print(f"upload {asset.href} to s3://{bucket}/{s3_path}",file=sys.stderr)
client.upload_file(
asset.get_absolute_href(),
bucket,
s3_path,
)
asset.href = f"s3://{bucket}/{s3_path}"
item.add_asset(key, asset)
cat.normalize_hrefs(f"s3://{bucket}/{subfolder}")
for item in cat.get_items():
# upload item to S3
print(f"upload {item.id} to s3://{bucket}/{subfolder}", file=sys.stderr)
pystac.write_file(item, item.get_self_href())
# upload catalog to S3
print(f"upload catalog.json to s3://{bucket}/{subfolder}", file=sys.stderr)
pystac.write_file(cat, cat.get_self_href())
print(f"s3://{bucket}/{subfolder}/catalog.json", file=sys.stdout)
|
The Python script stage.py
uploads a Spatio-Temporal Asset Catalog (STAC) catalog to an Object Storage S3 bucket.
This script utilizes the boto3
library for the S3 operations and customizes the STAC I/O to read from and write to an S3 bucket.
Here's a breakdown of what the script does:
-
The script imports necessary libraries and modules, including os
, sys
, pystac
, botocore
, boto3
, shutil
, and more.
-
It reads command-line arguments to get the STAC catalog URL, S3 bucket name, and a subfolder path to where the catalog will be uploaded.
-
It retrieves the S3 credentials and endpoint URL from environment variables.
-
The script copies the STAC catalog (from the given URL) to a temporary directory under "/tmp/catalog."
-
A custom STAC I/O class (CustomStacIO
) is defined. This class extends the DefaultStacIO
class and uses boto3
to interact with S3. It has methods for writing STAC text content to S3.
-
A connection to S3 is established using boto3
with the provided S3 credentials and endpoint URL.
-
The default STAC I/O class is set to the custom CustomStacIO
.
-
The script iterates through items in the STAC catalog, including their assets. For each asset, it uploads the data to the specified S3 bucket under the given subfolder and updates the asset's href to the S3 location.
-
After uploading all assets, it normalizes the catalog's hrefs to point to the S3 location.
-
The script then uploads each item in the catalog to S3, updating their hrefs accordingly.
-
Finally, it uploads the catalog itself to S3.
-
The script prints the S3 URL of the uploaded catalog to the standard output.
This script uploads a STAC catalog and its associated assets to an S3 bucket, making it accessible through the S3 endpoint.
It uses a custom STAC I/O to facilitate S3 interactions.
The AWS credentials and endpoint URL need to be properly configured in the environment variables for the script to work. A
| export WORKSPACE=/workspace/mastering-app-package
podman \
build \
--format docker \
-t localhost/stage:latest \
${WORKSPACE}/water-bodies/command-line-tools/stage
|
To run the stage-out step, one would run:
| cwltool \
--podman \
${WORKSPACE}/cwl-cli/stage-out.cwl \
--aws_access_key_id $( cat ~/.aws/credentials | grep aws_access_key_id | cut -d "=" -f 2 ) \
--aws_secret_access_key $( cat ~/.aws/credentials | grep aws_secret_access_key | cut -d "=" -f 2 ) \
--endpoint_url $( cat ~/.aws/config | grep endpoint_url | head -n 1 | cut -d "=" -f 2 ) \
--region_name $( cat ~/.aws/config | grep region | cut -d "=" -f 2 ) \
--s3_bucket processing-results \
--sub_path processing-results/$( cat /proc/sys/kernel/random/uuid ) \
--stac_catalog ${WORKSPACE}/runs
|