/
sentinel2_azure.py
294 lines (211 loc) · 11.6 KB
/
sentinel2_azure.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
"""
Functions for working with Sentinel-2 data stored in the Microsoft Azure
Block-Blob service.
"""
from collections import OrderedDict
from sentinelsat import SentinelAPI
from datetime import date
from azure.storage.blob import BlockBlobService, PublicAccess
import numpy as np
import os
import shutil
import fnmatch
import glob
import gc
class AzureAccess:
_acc_name = None
_acc_key = None
block_blob_service = None
def __init__(self, acc_name, acc_key):
self._acc_name = acc_name
self._acc_key = acc_key
self.connect_to_service()
def connect_to_service(self):
self.block_blob_service = BlockBlobService(account_name=self._acc_name,
account_key=self._acc_key)
def send_to_blob(self, tile, L1Cpath, check_blobs=False):
"""
Function uploads processed L2A products to blob storage. Check if container matching tile name already exists -if so
add files to that container, if not, create new container with name = tile.
:param blob_account_name:
:param blob_account_key:
:param tile:
:return:
"""
container_name = tile.lower() #convert string to lower case because Azure blobs named in lower case
local_path = L1Cpath
# find files to upload and append names to list
for folder in os.listdir(local_path):
file_names = []
file_paths = []
filtered_paths = []
filtered_names = []
folder_path = str(local_path+folder+"/")
# append all file paths and names to list, then filter to the relevant jp2 files
for (dirpath, dirnames, filenames) in os.walk(folder_path):
file_paths += [os.path.join(dirpath, file) for file in filenames]
file_names += [name for name in filenames]
for path in fnmatch.filter(file_paths,"*.jp2"):
filtered_paths.append(path)
for file in fnmatch.filter(file_names,"*.jp2"):
filtered_names.append(file)
#' check for existing containers
existing_containers = self.block_blob_service.list_containers()
existing_container_names = []
for item in existing_containers:
existing_container_names.append(item.name)
if any(tile.lower() in p for p in existing_container_names):
print('\n',' CONTAINER {} ALREADY EXISTS IN STORAGE ACCOUNT '.center(80, 'X').format(tile),
'\n', 'X'.center(80, 'X'))
# add files to existing container matching tile name
for i in np.arange(0,len(filtered_paths)):
print('\n',' UPLOADING FOLDERS TO EXISTING CONTAINER {} '.center(80, 'X').format(tile),
'\n', 'X'.center(80, 'X'))
source = str(filtered_paths[i])
destination = str(folder+'/' + filtered_names[i])
try:
self.block_blob_service.create_blob_from_path(container_name, destination, source)
except:
print("Uploading to blob failed")
else:
print('\n',
' CONTAINER DOES NOT ALREADY EXIST. CREATING NEW CONTAINER {} '.center(80, 'X').format(tile),
'\n', 'X'.center(80, 'X'))
# Create a container with the tile as filename, then add files.
self.block_blob_service.create_container(container_name)
print('\n', ' CONTAINER CREATED. UPLOADING FOLDERS TO NEW CONTAINER '.center(80, 'X'),
'\n', 'X'.center(80, 'X'))
for i in np.arange(0, len(filtered_paths)):
source = str(filtered_paths[i])
destination = str(folder + '/' + filtered_names[i])
try:
self.block_blob_service.create_blob_from_path(container_name, destination, source)
except:
print("Uploading to blob failed")
if check_blobs:
blob_list = []
print("Retrieving blobs in specified container...")
try:
content = self.block_blob_service.list_blobs(container_name)
print("******Blobs currently in the container:**********")
for blob in content:
blob_list.append(blob.name)
except:
print('\n', ' CHECKING BLOBS FAILED '.center(80, 'X'),
'\n', 'X'.center(80, 'X'))
print('\n',' BLOBS CURRENTLY STORED IN CONTAINER {}: '.center(80, 'X'))
print("\n",blob_list)
if any(folder in p for p in blob_list):
print('\n', ' UPLOAD SUCCESSFUL: FOLDERS IN BLOB MATCH THOSE IN SCRIPT '.center(80, 'X'),
'\n', 'X'.center(80, 'X'))
upload_status = 1
else:
upload_status = 0
else: upload_status = 1 # if check_blobs deselected, assume upload was successful (not recommended)
return upload_status
def dataset_to_blob(self, path_to_ds, delete_local_nc=True):
"""
This function is for uploading the output spatial datasets to blob storage and deleting them from local storage.
This was introduced because running averything locally was using up all avaolable disk space on the VM.
"""
print("\nUploading netCDF to blob storage\n")
container_name = 'bisc-outputs/' #name of container in blob store to collect datasets into
# get list of existing containers
existing_containers = self.block_blob_service.list_containers()
existing_container_names = []
for item in existing_containers:
existing_container_names.append(item.name)
# check to see if the bisc-outputs blob container already exists
if any(container_name in p for p in existing_container_names):
print('\n',' CONTAINER {} ALREADY EXISTS IN STORAGE ACCOUNT '.format(tile))
else:
# if container does not exist, create it
print('container doesnot exist, now creating it\n')
self.block_blob_service.create_container(container_name)
# with container created send all files in the "interpolated" subdirectory to the blob
for file in os.listdir(path_to_ds):
if file != 'interpolated':
self.block_blob_service.create_blob_from_path(container_name, file, os.path.join(path_to_ds,file))
print("Uploading {}".format(file))
# if toggled, delete the uploaded files from the local storage
if delete_local_nc == True:
files = os.listdir(path_to_ds)
for f in files:
if f != 'interpolated': #do not delete the subdirectory itself, just the files inside
try:
os.remove(str(path_to_ds + f))
except:
print("did not delete {}".format(f))
# explicitly call garbage collector to deallocate memory
print("GARGABE COLLECTION\n")
gc.collect()
return
def download_imgs_by_date(self, tile, date, img_path):
"""
This function downloads subsets of images stored remotely in Azure blobs. The blob name is identical to the
ESA tile ID. Inside each blob are images from every overpass made in June, July and August of the given year.
The files in blob storage are L2A files, meaning the L1C product has been downloaded from Sentinel-Hub and
processed for atmospheric conditions, spatial resolution etc using the Sen2cor command line tool.
This function searches for the blob associated with "tile" and then filteres out a subset according to the prescribed
date.
A flags can be raised in this function. The script checks that the correct number of image files have been
downloaded and that one of them is the cloud mask. If not, the flag is printed to the console and the files
associated with that particular date for that tile are discarded. The tile and date info are appended to a list of
failed downloads.
:param tile: tile ID
:param date: date of overpass
:param img_path: path to folder where images and other temp files will be stored
:param blob_account_name: account name for azure storage account where blobs are stored
:param blob account key: accesskey for blob storage account
:return filtered_blob_list: list of files to download
:return download_flag: Boolean, if True then problem with download, files skipped
"""
# setup list
bloblist = []
download_flag = False
QCflag = False
# append names of all blobs to bloblist
generator = self.block_blob_service.list_blobs(tile)
for blob in generator:
bloblist.append(blob.name)
# filter total bloblist to just jp2s, then just for the specified date
filtered_by_type = [string for string in bloblist if '_20m.jp2' in string]
filtered_bloblist = [string for string in filtered_by_type if date in string]
# Set up download loop for obtaining files for correct tile and date
# Note that this i done inside an if/else statement that uses a different loop
# depending whether the year is before or after 2017. This is because the file naming
# convention changed slightly for the 2018 files, requiring a different string
# extraction. I think the loop for 2018, 2019, 2020 will now actually work for all
# dates but not yet properly tested, so this slightly ugly workaround persists for now.
if (date[0:4] == '2018') | (date[0:4] == "2019") | (date[0:4] == "2020"):
# print(filtered_by_type)
print("FILTERED BLOBLIST")
print(filtered_bloblist)
# download the files in the filtered list
for i in filtered_bloblist:
try:
self.block_blob_service.get_blob_to_path(tile,
i, str(img_path+i[65:-4]+'.jp2'))
except:
print("ERROR IN DOWNLOADS")
else:
# download the files in the filtered list
for i in filtered_bloblist:
print(i)
try:
self.block_blob_service.get_blob_to_path(tile,
i, str(img_path+i[-38:-4]+'.jp2'))
except:
print("download failed {}".format(i))
# index to -38 because this is the filename without paths to folders etc
# Check downloaded files to make sure all bands plus the cloud mask are present in the wdir
# Raises download flag (Boolean true) and reports to console if there is a problem
if len(glob.glob(str(img_path + '*_B*_20m.jp2'))) < 9 or len(glob.glob(str(img_path + '*CLD*_20m.jp2'))) == 0:
download_flag = True
print("\n *** DOWNLOAD QC FLAG RAISED *** \n *** There may have been no overpass on this date, or there is a"
" band image or cloud layer missing from the downloaded directory ***")
else:
download_flag = False
print("\n *** NO DOWNLOAD QC FLAG RAISED: ALL NECESSARY FILES AVAILABLE IN WDIR ***")
# relevant files now downloaded from blob and stored in the savepath folder
return filtered_bloblist, download_flag