From 1a9287cc80eabfc293d95828242f54f6d8870aea Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 10 Aug 2020 10:53:00 -0700 Subject: [PATCH 1/9] migration --- google/cloud/storage/blob.py | 174 ++++++++++++++++++++++++++++++++++- setup.py | 2 +- 2 files changed, 172 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 07a17867c..927dc0d00 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -29,6 +29,7 @@ import copy import hashlib from io import BytesIO +import logging import mimetypes import os import re @@ -107,6 +108,11 @@ _READ_LESS_THAN_SIZE = ( "Size {:d} was specified but the file-like object only had " "{:d} bytes remaining." ) +_CHUNKED_DOWNLOAD_CHECKSUM_MESSAGE = ( + "A checksum of type `{}` was requested, but checksumming is not available " + "for downloads when chunk_size is set." +) + _DEFAULT_CHUNKSIZE = 104857600 # 1024 * 1024 B * 100 = 100 MB _MAX_MULTIPART_SIZE = 8388608 # 8 MB @@ -822,6 +828,7 @@ def _do_download( end=None, raw_download=False, timeout=_DEFAULT_TIMEOUT, + checksum="md5", ): """Perform a download without any error handling. @@ -859,6 +866,17 @@ def _do_download( repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + + :type checksum str + :param checksum: + (Optional) The type of checksum to compute to verify the integrity + of the object. The response headers must contain a checksum of the + requested type. If the headers lack an appropriate checksum (for + instance in the case of transcoded or ranged downloads where the + remote service does not know the correct checksum, including + downloads where chunk_size is set) an INFO-level log will be + emitted. Supported values are "md5", "crc32c" and None. The default + is "md5". """ if self.chunk_size is None: if raw_download: @@ -867,12 +885,16 @@ def _do_download( klass = Download download = klass( - download_url, stream=file_obj, headers=headers, start=start, end=end + download_url, stream=file_obj, headers=headers, start=start, end=end, checksum=checksum ) response = download.consume(transport, timeout=timeout) self._extract_headers_from_download(response) else: + if checksum: + msg = _CHUNKED_DOWNLOAD_CHECKSUM_MESSAGE.format(checksum) + logging.info(msg) + if raw_download: klass = RawChunkedDownload else: @@ -978,6 +1000,17 @@ def download_to_file( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum str + :param checksum: + (Optional) The type of checksum to compute to verify the integrity + of the object. The response headers must contain a checksum of the + requested type. If the headers lack an appropriate checksum (for + instance in the case of transcoded or ranged downloads where the + remote service does not know the correct checksum, including + downloads where chunk_size is set) an INFO-level log will be + emitted. Supported values are "md5", "crc32c" and None. The default + is "md5". + :raises: :class:`google.cloud.exceptions.NotFound` """ client = self._require_client(client) @@ -1003,6 +1036,7 @@ def download_to_file( end, raw_download, timeout=timeout, + checksum=checksum, ) except resumable_media.InvalidResponse as exc: _raise_from_invalid_response(exc) @@ -1019,6 +1053,7 @@ def download_to_filename( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum="md5" ): """Download the contents of this blob into a named file. @@ -1071,6 +1106,17 @@ def download_to_filename( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum str + :param checksum: + (Optional) The type of checksum to compute to verify the integrity + of the object. The response headers must contain a checksum of the + requested type. If the headers lack an appropriate checksum (for + instance in the case of transcoded or ranged downloads where the + remote service does not know the correct checksum, including + downloads where chunk_size is set) an INFO-level log will be + emitted. Supported values are "md5", "crc32c" and None. The default + is "md5". + :raises: :class:`google.cloud.exceptions.NotFound` """ try: @@ -1086,6 +1132,7 @@ def download_to_filename( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + checksum=checksum, ) except resumable_media.DataCorruption: # Delete the corrupt downloaded file. @@ -1111,6 +1158,7 @@ def download_as_string( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum="md5" ): """Download the contents of this blob as a bytes object. @@ -1160,6 +1208,17 @@ def download_as_string( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum str + :param checksum: + (Optional) The type of checksum to compute to verify the integrity + of the object. The response headers must contain a checksum of the + requested type. If the headers lack an appropriate checksum (for + instance in the case of transcoded or ranged downloads where the + remote service does not know the correct checksum, including + downloads where chunk_size is set) an INFO-level log will be + emitted. Supported values are "md5", "crc32c" and None. The default + is "md5". + :rtype: bytes :returns: The data stored in this blob. @@ -1177,6 +1236,7 @@ def download_as_string( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + checksum=checksum, ) return string_buffer.getvalue() @@ -1278,6 +1338,7 @@ def _do_multipart_upload( if_metageneration_match, if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Perform a multipart upload. @@ -1339,6 +1400,14 @@ def _do_multipart_upload( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. The request metadata will be amended + to include the computed value. Using this option will override a + manually-set checksum value. Supported values are "md5", + "crc32c" and None. The default is None. + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the multipart upload request. @@ -1394,7 +1463,7 @@ def _do_multipart_upload( ) upload_url = _add_query_parameters(base_url, name_value_pairs) - upload = MultipartUpload(upload_url, headers=headers) + upload = MultipartUpload(upload_url, headers=headers, checksum=checksum) if num_retries is not None: upload._retry_strategy = resumable_media.RetryStrategy( @@ -1496,6 +1565,16 @@ def _initiate_resumable_upload( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. After the upload is complete, the + server-computed checksum of the resulting object will be checked + and google.resumable_media.common.DataCorruption will be raised on + a mismatch. On a failure, the client will attempt to delete the + corrupted file from the remote host automatically. Supported values + are "md5", "crc32c" and None. The default is None. + :rtype: tuple :returns: Pair of @@ -1584,6 +1663,7 @@ def _do_resumable_upload( if_metageneration_match, if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Perform a resumable upload. @@ -1648,6 +1728,16 @@ def _do_resumable_upload( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. After the upload is complete, the + server-computed checksum of the resulting object will be checked + and google.resumable_media.common.DataCorruption will be raised on + a mismatch. On a failure, the client will attempt to delete the + corrupted file from the remote host automatically. Supported values + are "md5", "crc32c" and None. The default is None. + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the final chunk is uploaded. @@ -1664,10 +1754,16 @@ def _do_resumable_upload( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + checksum=checksum ) while not upload.finished: - response = upload.transmit_next_chunk(transport, timeout=timeout) + try: + response = upload.transmit_next_chunk(transport, timeout=timeout) + except google.resumable_media.common.DataCorruption: + # Attempt to delete the remote object. + # FIXME + raise return response @@ -1684,6 +1780,7 @@ def _do_upload( if_metageneration_match, if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Determine an upload strategy and then perform the upload. @@ -1749,6 +1846,19 @@ def _do_upload( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. If the upload is completed in a single + request, the checksum will be entirely precomputed and the remote + server will handle verification and error handling. If the upload + is too large and must be transmitted in multiple requests, the + checksum will be incrementally computed and the client will handle + verification and error handling, raising + google.resumable_media.common.DataCorruption on a mismatch and + attempting to delete the corrupted file. Supported values are + "md5", "crc32c" and None. The default is None. + :rtype: dict :returns: The parsed JSON from the "200 OK" response. This will be the **only** response in the multipart case and it will be the @@ -1767,6 +1877,7 @@ def _do_upload( if_metageneration_match, if_metageneration_not_match, timeout=timeout, + checksum=checksum, ) else: response = self._do_resumable_upload( @@ -1781,6 +1892,7 @@ def _do_upload( if_metageneration_match, if_metageneration_not_match, timeout=timeout, + checksum=checksum, ) return response.json() @@ -1799,6 +1911,7 @@ def upload_from_file( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Upload the contents of this blob from a file-like object. @@ -1893,6 +2006,19 @@ def upload_from_file( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. If the upload is completed in a single + request, the checksum will be entirely precomputed and the remote + server will handle verification and error handling. If the upload + is too large and must be transmitted in multiple requests, the + checksum will be incrementally computed and the client will handle + verification and error handling, raising + google.resumable_media.common.DataCorruption on a mismatch and + attempting to delete the corrupted file. Supported values are + "md5", "crc32c" and None. The default is None. + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the upload response returns an error status. @@ -1919,6 +2045,7 @@ def upload_from_file( if_metageneration_match, if_metageneration_not_match, timeout=timeout, + checksum=checksum, ) self._set_properties(created_json) except resumable_media.InvalidResponse as exc: @@ -1935,6 +2062,7 @@ def upload_from_filename( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Upload this blob's contents from the content of a named file. @@ -2001,6 +2129,19 @@ def upload_from_filename( repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. If the upload is completed in a single + request, the checksum will be entirely precomputed and the remote + server will handle verification and error handling. If the upload + is too large and must be transmitted in multiple requests, the + checksum will be incrementally computed and the client will handle + verification and error handling, raising + google.resumable_media.common.DataCorruption on a mismatch and + attempting to delete the corrupted file. Supported values are + "md5", "crc32c" and None. The default is None. """ content_type = self._get_content_type(content_type, filename=filename) @@ -2017,6 +2158,7 @@ def upload_from_filename( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + checksum=checksum ) def upload_from_string( @@ -2030,6 +2172,7 @@ def upload_from_string( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Upload contents of this blob from the provided string. @@ -2091,6 +2234,19 @@ def upload_from_string( repeated several times using the same timeout each time. Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. If the upload is completed in a single + request, the checksum will be entirely precomputed and the remote + server will handle verification and error handling. If the upload + is too large and must be transmitted in multiple requests, the + checksum will be incrementally computed and the client will handle + verification and error handling, raising + google.resumable_media.common.DataCorruption on a mismatch and + attempting to delete the corrupted file. Supported values are + "md5", "crc32c" and None. The default is None. """ data = _to_bytes(data, encoding="utf-8") string_buffer = BytesIO(data) @@ -2114,6 +2270,7 @@ def create_resumable_upload_session( origin=None, client=None, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Create a resumable upload session. @@ -2179,6 +2336,16 @@ def create_resumable_upload_session( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type checksum: str + :param checksum: + (Optional) The type of checksum to compute to verify + the integrity of the object. After the upload is complete, the + server-computed checksum of the resulting object will be checked + and google.resumable_media.common.DataCorruption will be raised on + a mismatch. On a failure, the client will attempt to delete the + corrupted file from the remote host automatically. Supported values + are "md5", "crc32c" and None. The default is None. + :rtype: str :returns: The resumable upload session URL. The upload can be completed by making an HTTP PUT request with the @@ -2208,6 +2375,7 @@ def create_resumable_upload_session( extra_headers=extra_headers, chunk_size=self._CHUNK_SIZE_MULTIPLE, timeout=timeout, + checksum=checksum ) return upload.resumable_url diff --git a/setup.py b/setup.py index 91cb1dcc8..f5fff956c 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ dependencies = [ "google-auth >= 1.11.0, < 2.0dev", "google-cloud-core >= 1.2.0, < 2.0dev", - "google-resumable-media >= 0.6.0, < 2.0dev", + "google-resumable-media >= 0.7.0, < 2.0dev", ] extras = {} From fde82b7caf4fe06b7f7a72c0bf8f84768d5845d3 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 10 Aug 2020 17:40:56 -0700 Subject: [PATCH 2/9] incremental --- google/cloud/storage/blob.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 927dc0d00..de7896350 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -924,6 +924,7 @@ def download_to_file( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum="md5", ): """Download the contents of this blob into a file-like object. @@ -1761,8 +1762,8 @@ def _do_resumable_upload( try: response = upload.transmit_next_chunk(transport, timeout=timeout) except google.resumable_media.common.DataCorruption: - # Attempt to delete the remote object. - # FIXME + # Attempt to delete the corrupted object. + self.delete() raise return response From 98e09e128158865f0506e5d0d2a704c5e1f076b4 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Wed, 12 Aug 2020 16:26:53 -0700 Subject: [PATCH 3/9] incremental --- google/cloud/storage/blob.py | 5 +++-- tests/unit/test_blob.py | 22 +++++++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index a952c28a5..7fb574b74 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -1666,6 +1666,7 @@ def _initiate_resumable_upload( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + checksum=None, ): """Initiate a resumable upload. @@ -1806,7 +1807,7 @@ def _initiate_resumable_upload( ) upload_url = _add_query_parameters(base_url, name_value_pairs) - upload = ResumableUpload(upload_url, chunk_size, headers=headers) + upload = ResumableUpload(upload_url, chunk_size, headers=headers, checksum=checksum) if num_retries is not None: upload._retry_strategy = resumable_media.RetryStrategy( @@ -1935,7 +1936,7 @@ def _do_resumable_upload( while not upload.finished: try: response = upload.transmit_next_chunk(transport, timeout=timeout) - except google.resumable_media.common.DataCorruption: + except resumable_media.DataCorruption: # Attempt to delete the corrupted object. self.delete() raise diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 68011e438..2e83f420c 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -984,11 +984,11 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): if w_range: patched.assert_called_once_with( - download_url, stream=file_obj, headers=headers, start=1, end=3 + download_url, stream=file_obj, headers=headers, start=1, end=3, checksum="md5" ) else: patched.assert_called_once_with( - download_url, stream=file_obj, headers=headers, start=None, end=None + download_url, stream=file_obj, headers=headers, start=None, end=None, checksum="md5" ) patched.return_value.consume.assert_called_once_with( @@ -1130,6 +1130,7 @@ def test_download_to_file_with_failure(self): None, False, timeout=self._get_default_timeout(), + checksum="md5" ) def test_download_to_file_wo_media_link(self): @@ -1160,6 +1161,7 @@ def test_download_to_file_wo_media_link(self): None, False, timeout=self._get_default_timeout(), + checksum="md5", ) def test_download_to_file_w_generation_match(self): @@ -1189,6 +1191,7 @@ def test_download_to_file_w_generation_match(self): None, False, timeout=self._get_default_timeout(), + checksum="md5", ) def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): @@ -1226,6 +1229,7 @@ def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): None, raw_download, timeout=expected_timeout, + checksum="md5", ) def test_download_to_file_wo_chunks_wo_raw(self): @@ -1291,6 +1295,7 @@ def _download_to_filename_helper(self, updated, raw_download, timeout=None): None, raw_download, timeout=expected_timeout, + checksum="md5", ) stream = blob._do_download.mock_calls[0].args[1] self.assertEqual(stream.name, temp.name) @@ -1322,6 +1327,7 @@ def test_download_to_filename_w_generation_match(self): None, False, timeout=self._get_default_timeout(), + checksum="md5", ) def test_download_to_filename_w_updated_wo_raw(self): @@ -1379,6 +1385,7 @@ def test_download_to_filename_corrupted(self): None, False, timeout=self._get_default_timeout(), + checksum="md5" ) stream = blob._do_download.mock_calls[0].args[1] self.assertEqual(stream.name, filename) @@ -1413,6 +1420,7 @@ def test_download_to_filename_w_key(self): None, False, timeout=self._get_default_timeout(), + checksum="md5", ) stream = blob._do_download.mock_calls[0].args[1] self.assertEqual(stream.name, temp.name) @@ -1444,6 +1452,7 @@ def _download_as_bytes_helper(self, raw_download, timeout=None): None, raw_download, timeout=expected_timeout, + checksum="md5", ) stream = blob._do_download.mock_calls[0].args[1] self.assertIsInstance(stream, io.BytesIO) @@ -1522,6 +1531,7 @@ def test_download_as_bytes_w_generation_match(self): if_metageneration_match=None, if_metageneration_not_match=None, timeout=self._get_default_timeout(), + checksum="md5", ) def test_download_as_bytes_wo_raw(self): @@ -1563,6 +1573,7 @@ def _download_as_text_helper(self, raw_download, encoding=None, timeout=None): None, raw_download, timeout=expected_timeout, + checksum="md5", ) stream = blob._do_download.mock_calls[0].args[1] self.assertIsInstance(stream, io.BytesIO) @@ -1591,6 +1602,7 @@ def test_download_as_text_w_generation_match(self): if_metageneration_match=None, if_metageneration_not_match=None, timeout=self._get_default_timeout(), + checksum="md5", ) def test_download_as_text_wo_raw(self): @@ -1629,6 +1641,7 @@ def test_download_as_string(self, mock_warn): if_metageneration_match=None, if_metageneration_not_match=None, timeout=self._get_default_timeout(), + checksum="md5", ) mock_warn.assert_called_with( @@ -2432,6 +2445,7 @@ def _do_upload_helper( if_metageneration_match, if_metageneration_not_match, timeout=expected_timeout, + checksum=None, ) blob._do_resumable_upload.assert_not_called() else: @@ -2448,6 +2462,7 @@ def _do_upload_helper( if_metageneration_match, if_metageneration_not_match, timeout=expected_timeout, + checksum=None, ) def test__do_upload_uses_multipart(self): @@ -2524,6 +2539,7 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): if_metageneration_match, if_metageneration_not_match, timeout=expected_timeout, + checksum=None, ) return stream @@ -2584,7 +2600,7 @@ def _do_upload_mock_call_helper( self.assertIsNone(pos_args[9]) # if_metageneration_not_match expected_timeout = self._get_default_timeout() if timeout is None else timeout - self.assertEqual(kwargs, {"timeout": expected_timeout}) + self.assertEqual(kwargs, {"timeout": expected_timeout, "checksum": None}) return pos_args[1] From 64f785aa7e0f72ca616d086564ba130fd3ff181f Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Thu, 13 Aug 2020 16:00:10 -0700 Subject: [PATCH 4/9] unit tests pass --- tests/unit/test_blob.py | 41 +++++++++++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 2e83f420c..f56e62395 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -1012,7 +1012,7 @@ def test__do_download_wo_chunks_w_custom_timeout(self): w_range=False, raw_download=False, timeout=9.58 ) - def _do_download_helper_w_chunks(self, w_range, raw_download, timeout=None): + def _do_download_helper_w_chunks(self, w_range, raw_download, timeout=None, checksum="md5"): blob_name = "blob-name" client = mock.Mock(_credentials=_make_credentials(), spec=["_credentials"]) bucket = _Bucket(client) @@ -1055,6 +1055,7 @@ def side_effect(*args, **kwargs): start=1, end=3, raw_download=raw_download, + checksum=checksum, **timeout_kwarg ) else: @@ -1064,6 +1065,7 @@ def side_effect(*args, **kwargs): download_url, headers, raw_download=raw_download, + checksum=checksum, **timeout_kwarg ) @@ -1094,6 +1096,18 @@ def test__do_download_w_chunks_w_range_w_raw(self): def test__do_download_w_chunks_w_custom_timeout(self): self._do_download_helper_w_chunks(w_range=True, raw_download=True, timeout=9.58) + def test__do_download_w_chunks_w_checksum(self): + from google.cloud.storage import blob as blob_module + + with mock.patch("logging.info") as patch: + self._do_download_helper_w_chunks(w_range=False, raw_download=False, checksum="md5") + patch.assert_called_once_with(blob_module._CHUNKED_DOWNLOAD_CHECKSUM_MESSAGE.format("md5")) + + def test__do_download_w_chunks_wo_checksum(self): + with mock.patch("logging.info") as patch: + self._do_download_helper_w_chunks(w_range=False, raw_download=False, checksum=None) + patch.assert_not_called() + def test_download_to_file_with_failure(self): import requests from google.resumable_media import InvalidResponse @@ -2143,7 +2157,7 @@ def test__initiate_resumable_upload_with_generation_not_match(self): def test__initiate_resumable_upload_with_predefined_acl(self): self._initiate_resumable_helper(predefined_acl="private") - def _make_resumable_transport(self, headers1, headers2, headers3, total_bytes): + def _make_resumable_transport(self, headers1, headers2, headers3, total_bytes, data_corruption=False): from google import resumable_media fake_transport = mock.Mock(spec=["request"]) @@ -2153,9 +2167,12 @@ def _make_resumable_transport(self, headers1, headers2, headers3, total_bytes): resumable_media.PERMANENT_REDIRECT, headers2 ) json_body = '{{"size": "{:d}"}}'.format(total_bytes) - fake_response3 = self._mock_requests_response( - http_client.OK, headers3, content=json_body.encode("utf-8") - ) + if data_corruption: + fake_response3 = resumable_media.DataCorruption(None) + else: + fake_response3 = self._mock_requests_response( + http_client.OK, headers3, content=json_body.encode("utf-8") + ) responses = [fake_response1, fake_response2, fake_response3] fake_transport.request.side_effect = responses @@ -2266,6 +2283,7 @@ def _do_resumable_helper( if_metageneration_match=None, if_metageneration_not_match=None, timeout=None, + data_corruption=False ): bucket = _Bucket(name="yesterday") blob = self._make_one(u"blob-name", bucket=bucket) @@ -2285,7 +2303,7 @@ def _do_resumable_helper( headers1 = {"location": resumable_url} headers2 = {"range": "bytes=0-{:d}".format(blob.chunk_size - 1)} transport, responses = self._make_resumable_transport( - headers1, headers2, {}, total_bytes + headers1, headers2, {}, total_bytes, data_corruption=data_corruption ) # Create some mock arguments and call the method under test. @@ -2359,6 +2377,7 @@ def _do_resumable_helper( ) self.assertEqual(transport.request.mock_calls, [call0, call1, call2]) + def test__do_resumable_upload_with_custom_timeout(self): self._do_resumable_helper(timeout=9.58) @@ -2374,6 +2393,16 @@ def test__do_resumable_upload_with_retry(self): def test__do_resumable_upload_with_predefined_acl(self): self._do_resumable_helper(predefined_acl="private") + def test__do_resumable_upload_with_data_corruption(self): + from google.resumable_media import DataCorruption + + with mock.patch("google.cloud.storage.blob.Blob.delete") as patch: + try: + self._do_resumable_helper(data_corruption=True) + except Exception as e: + self.assertTrue(patch.called) + self.assertIsInstance(e, DataCorruption) + def _do_upload_helper( self, chunk_size=None, From 5c2d78aff2592f6e4a3381ec9b40fb095011dc26 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Fri, 14 Aug 2020 12:10:06 -0700 Subject: [PATCH 5/9] lint and system tests --- google/cloud/storage/blob.py | 29 ++++++++----- tests/system/test_system.py | 82 ++++++++++++++++++++++++++++++++++++ tests/unit/test_blob.py | 41 +++++++++++++----- 3 files changed, 130 insertions(+), 22 deletions(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 7fb574b74..3c26d898a 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -885,7 +885,12 @@ def _do_download( klass = Download download = klass( - download_url, stream=file_obj, headers=headers, start=start, end=end, checksum=checksum + download_url, + stream=file_obj, + headers=headers, + start=start, + end=end, + checksum=checksum, ) response = download.consume(transport, timeout=timeout) self._extract_headers_from_download(response) @@ -1054,7 +1059,7 @@ def download_to_filename( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, - checksum="md5" + checksum="md5", ): """Download the contents of this blob into a named file. @@ -1159,7 +1164,7 @@ def download_as_bytes( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, - checksum="md5" + checksum="md5", ): """Download the contents of this blob as a bytes object. @@ -1807,7 +1812,9 @@ def _initiate_resumable_upload( ) upload_url = _add_query_parameters(base_url, name_value_pairs) - upload = ResumableUpload(upload_url, chunk_size, headers=headers, checksum=checksum) + upload = ResumableUpload( + upload_url, chunk_size, headers=headers, checksum=checksum + ) if num_retries is not None: upload._retry_strategy = resumable_media.RetryStrategy( @@ -1930,7 +1937,7 @@ def _do_resumable_upload( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, - checksum=checksum + checksum=checksum, ) while not upload.finished: @@ -2031,7 +2038,7 @@ def _do_upload( is too large and must be transmitted in multiple requests, the checksum will be incrementally computed and the client will handle verification and error handling, raising - google.resumable_media.common.DataCorruption on a mismatch and + google.resumable_media.common.DataCorruption on a mismatch and attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. @@ -2191,7 +2198,7 @@ def upload_from_file( is too large and must be transmitted in multiple requests, the checksum will be incrementally computed and the client will handle verification and error handling, raising - google.resumable_media.common.DataCorruption on a mismatch and + google.resumable_media.common.DataCorruption on a mismatch and attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. @@ -2315,7 +2322,7 @@ def upload_from_filename( is too large and must be transmitted in multiple requests, the checksum will be incrementally computed and the client will handle verification and error handling, raising - google.resumable_media.common.DataCorruption on a mismatch and + google.resumable_media.common.DataCorruption on a mismatch and attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. """ @@ -2334,7 +2341,7 @@ def upload_from_filename( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, - checksum=checksum + checksum=checksum, ) def upload_from_string( @@ -2420,7 +2427,7 @@ def upload_from_string( is too large and must be transmitted in multiple requests, the checksum will be incrementally computed and the client will handle verification and error handling, raising - google.resumable_media.common.DataCorruption on a mismatch and + google.resumable_media.common.DataCorruption on a mismatch and attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. """ @@ -2551,7 +2558,7 @@ def create_resumable_upload_session( extra_headers=extra_headers, chunk_size=self._CHUNK_SIZE_MULTIPLE, timeout=timeout, - checksum=checksum + checksum=checksum, ) return upload.resumable_url diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 3fb701d39..5051181c8 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -22,6 +22,7 @@ import tempfile import time import unittest +import mock import requests import six @@ -33,6 +34,8 @@ from google.cloud.storage.bucket import LifecycleRuleDelete from google.cloud.storage.bucket import LifecycleRuleSetStorageClass from google.cloud import kms +from google import resumable_media +import google.auth import google.api_core from google.api_core import path_template import google.oauth2 @@ -555,6 +558,33 @@ def test_large_file_write_from_stream(self): md5_hash = md5_hash.encode("utf-8") self.assertEqual(md5_hash, file_data["hash"]) + def test_large_file_write_from_stream_with_checksum(self): + blob = self.bucket.blob("LargeFile") + + file_data = self.FILES["big"] + with open(file_data["path"], "rb") as file_obj: + blob.upload_from_file(file_obj, checksum="crc32c") + self.case_blobs_to_delete.append(blob) + + md5_hash = blob.md5_hash + if not isinstance(md5_hash, six.binary_type): + md5_hash = md5_hash.encode("utf-8") + self.assertEqual(md5_hash, file_data["hash"]) + + def test_large_file_write_from_stream_with_failed_checksum(self): + blob = self.bucket.blob("LargeFile") + + file_data = self.FILES["big"] + + with open(file_data["path"], "rb") as file_obj: + with mock.patch( + "google.resumable_media._helpers.prepare_checksum_digest", + return_value="FFFFFF==", + ): + with self.assertRaises(resumable_media.DataCorruption): + blob.upload_from_file(file_obj, checksum="crc32c") + self.assertFalse(blob.exists()) + def test_large_encrypted_file_write_from_stream(self): blob = self.bucket.blob("LargeFile", encryption_key=self.ENCRYPTION_KEY) @@ -589,6 +619,31 @@ def test_small_file_write_from_filename(self): md5_hash = md5_hash.encode("utf-8") self.assertEqual(md5_hash, file_data["hash"]) + def test_small_file_write_from_filename_with_checksum(self): + blob = self.bucket.blob("SmallFile") + + file_data = self.FILES["simple"] + blob.upload_from_filename(file_data["path"], checksum="crc32c") + self.case_blobs_to_delete.append(blob) + + md5_hash = blob.md5_hash + if not isinstance(md5_hash, six.binary_type): + md5_hash = md5_hash.encode("utf-8") + self.assertEqual(md5_hash, file_data["hash"]) + + def test_small_file_write_from_filename_with_failed_checksum(self): + blob = self.bucket.blob("SmallFile") + + file_data = self.FILES["simple"] + with mock.patch( + "google.resumable_media._helpers.prepare_checksum_digest", + return_value="FFFFFF==", + ): + with self.assertRaises(google.api_core.exceptions.BadRequest): + blob.upload_from_filename(file_data["path"], checksum="crc32c") + + self.assertFalse(blob.exists()) + @unittest.skipUnless(USER_PROJECT, "USER_PROJECT not set in environment.") def test_crud_blob_w_user_project(self): with_user_project = Config.CLIENT.bucket( @@ -814,6 +869,33 @@ def test_download_w_generation_match(self): self.assertEqual(file_contents, stored_contents) + def test_download_w_failed_crc32c_checksum(self): + blob = self.bucket.blob("FailedChecksumBlob") + file_contents = b"Hello World" + blob.upload_from_string(file_contents) + self.case_blobs_to_delete.append(blob) + + temp_filename = tempfile.mktemp() + + # Intercept the digest processing a the last stage and replace it with garbage + with mock.patch( + "google.resumable_media._helpers.prepare_checksum_digest", + return_value="FFFFFF==", + ): + with self.assertRaises(resumable_media.DataCorruption): + blob.download_to_filename(temp_filename, checksum="crc32c") + + # Confirm the file was deleted on failure + self.assertFalse(os.path.isfile(temp_filename)) + + # Now download with checksumming turned off + blob.download_to_filename(temp_filename, checksum=None) + + with open(temp_filename, "rb") as file_obj: + stored_contents = file_obj.read() + + self.assertEqual(file_contents, stored_contents) + def test_copy_existing_file(self): filename = self.FILES["logo"]["path"] blob = storage.Blob("CloudLogo", bucket=self.bucket) diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index f56e62395..97a6e2988 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -984,11 +984,21 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): if w_range: patched.assert_called_once_with( - download_url, stream=file_obj, headers=headers, start=1, end=3, checksum="md5" + download_url, + stream=file_obj, + headers=headers, + start=1, + end=3, + checksum="md5", ) else: patched.assert_called_once_with( - download_url, stream=file_obj, headers=headers, start=None, end=None, checksum="md5" + download_url, + stream=file_obj, + headers=headers, + start=None, + end=None, + checksum="md5", ) patched.return_value.consume.assert_called_once_with( @@ -1012,7 +1022,9 @@ def test__do_download_wo_chunks_w_custom_timeout(self): w_range=False, raw_download=False, timeout=9.58 ) - def _do_download_helper_w_chunks(self, w_range, raw_download, timeout=None, checksum="md5"): + def _do_download_helper_w_chunks( + self, w_range, raw_download, timeout=None, checksum="md5" + ): blob_name = "blob-name" client = mock.Mock(_credentials=_make_credentials(), spec=["_credentials"]) bucket = _Bucket(client) @@ -1100,12 +1112,18 @@ def test__do_download_w_chunks_w_checksum(self): from google.cloud.storage import blob as blob_module with mock.patch("logging.info") as patch: - self._do_download_helper_w_chunks(w_range=False, raw_download=False, checksum="md5") - patch.assert_called_once_with(blob_module._CHUNKED_DOWNLOAD_CHECKSUM_MESSAGE.format("md5")) + self._do_download_helper_w_chunks( + w_range=False, raw_download=False, checksum="md5" + ) + patch.assert_called_once_with( + blob_module._CHUNKED_DOWNLOAD_CHECKSUM_MESSAGE.format("md5") + ) def test__do_download_w_chunks_wo_checksum(self): with mock.patch("logging.info") as patch: - self._do_download_helper_w_chunks(w_range=False, raw_download=False, checksum=None) + self._do_download_helper_w_chunks( + w_range=False, raw_download=False, checksum=None + ) patch.assert_not_called() def test_download_to_file_with_failure(self): @@ -1144,7 +1162,7 @@ def test_download_to_file_with_failure(self): None, False, timeout=self._get_default_timeout(), - checksum="md5" + checksum="md5", ) def test_download_to_file_wo_media_link(self): @@ -1399,7 +1417,7 @@ def test_download_to_filename_corrupted(self): None, False, timeout=self._get_default_timeout(), - checksum="md5" + checksum="md5", ) stream = blob._do_download.mock_calls[0].args[1] self.assertEqual(stream.name, filename) @@ -2157,7 +2175,9 @@ def test__initiate_resumable_upload_with_generation_not_match(self): def test__initiate_resumable_upload_with_predefined_acl(self): self._initiate_resumable_helper(predefined_acl="private") - def _make_resumable_transport(self, headers1, headers2, headers3, total_bytes, data_corruption=False): + def _make_resumable_transport( + self, headers1, headers2, headers3, total_bytes, data_corruption=False + ): from google import resumable_media fake_transport = mock.Mock(spec=["request"]) @@ -2283,7 +2303,7 @@ def _do_resumable_helper( if_metageneration_match=None, if_metageneration_not_match=None, timeout=None, - data_corruption=False + data_corruption=False, ): bucket = _Bucket(name="yesterday") blob = self._make_one(u"blob-name", bucket=bucket) @@ -2377,7 +2397,6 @@ def _do_resumable_helper( ) self.assertEqual(transport.request.mock_calls, [call0, call1, call2]) - def test__do_resumable_upload_with_custom_timeout(self): self._do_resumable_helper(timeout=9.58) From 379d57b24b991f686e5e09aa75a8472ff7bcd32a Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Fri, 14 Aug 2020 12:50:53 -0700 Subject: [PATCH 6/9] fix docs formatting --- google/cloud/storage/blob.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index f594eece0..977f6ef6b 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -867,7 +867,7 @@ def _do_download( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. - :type checksum str + :type checksum: str :param checksum: (Optional) The type of checksum to compute to verify the integrity of the object. The response headers must contain a checksum of the @@ -1006,7 +1006,7 @@ def download_to_file( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. - :type checksum str + :type checksum: str :param checksum: (Optional) The type of checksum to compute to verify the integrity of the object. The response headers must contain a checksum of the @@ -1112,7 +1112,7 @@ def download_to_filename( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. - :type checksum str + :type checksum: str :param checksum: (Optional) The type of checksum to compute to verify the integrity of the object. The response headers must contain a checksum of the @@ -1214,7 +1214,7 @@ def download_as_bytes( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. - :type checksum str + :type checksum: str :param checksum: (Optional) The type of checksum to compute to verify the integrity of the object. The response headers must contain a checksum of the From b7f728538ab4fe95b1f9cbf0459e7f3ab3f49e8a Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 17 Aug 2020 09:42:05 -0700 Subject: [PATCH 7/9] comment mock strategy --- tests/system/test_system.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index db2593070..053bd7ef7 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -576,6 +576,7 @@ def test_large_file_write_from_stream_with_failed_checksum(self): file_data = self.FILES["big"] + # Intercept the digest processing at the last stage and replace it with garbage with open(file_data["path"], "rb") as file_obj: with mock.patch( "google.resumable_media._helpers.prepare_checksum_digest", @@ -635,6 +636,7 @@ def test_small_file_write_from_filename_with_failed_checksum(self): blob = self.bucket.blob("SmallFile") file_data = self.FILES["simple"] + # Intercept the digest processing at the last stage and replace it with garbage with mock.patch( "google.resumable_media._helpers.prepare_checksum_digest", return_value="FFFFFF==", @@ -877,7 +879,7 @@ def test_download_w_failed_crc32c_checksum(self): temp_filename = tempfile.mktemp() - # Intercept the digest processing a the last stage and replace it with garbage + # Intercept the digest processing at the last stage and replace it with garbage with mock.patch( "google.resumable_media._helpers.prepare_checksum_digest", return_value="FFFFFF==", From a5540473db5cc03305ffe8023393d3e524281594 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 24 Aug 2020 16:55:26 -0700 Subject: [PATCH 8/9] Test update --- tests/system/test_system.py | 39 +++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 76a98def1..4676f53b1 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -576,7 +576,10 @@ def test_large_file_write_from_stream_with_failed_checksum(self): file_data = self.FILES["big"] - # Intercept the digest processing at the last stage and replace it with garbage + # Intercept the digest processing at the last stage and replace it with garbage. + # This is done with a patch to monkey-patch the resumable media library's checksum + # processing; it does not mock a remote interface like a unit test would. The + # remote API is still exercised. with open(file_data["path"], "rb") as file_obj: with mock.patch( "google.resumable_media._helpers.prepare_checksum_digest", @@ -881,26 +884,28 @@ def test_download_w_failed_crc32c_checksum(self): blob.upload_from_string(file_contents) self.case_blobs_to_delete.append(blob) - temp_filename = tempfile.mktemp() - - # Intercept the digest processing at the last stage and replace it with garbage - with mock.patch( - "google.resumable_media._helpers.prepare_checksum_digest", - return_value="FFFFFF==", - ): - with self.assertRaises(resumable_media.DataCorruption): - blob.download_to_filename(temp_filename, checksum="crc32c") + with tempfile.NamedTemporaryFile() as temp_f: + # Intercept the digest processing at the last stage and replace it with garbage. + # This is done with a patch to monkey-patch the resumable media library's checksum + # processing; it does not mock a remote interface like a unit test would. The + # remote API is still exercised. + with mock.patch( + "google.resumable_media._helpers.prepare_checksum_digest", + return_value="FFFFFF==", + ): + with self.assertRaises(resumable_media.DataCorruption): + blob.download_to_filename(temp_f.name, checksum="crc32c") - # Confirm the file was deleted on failure - self.assertFalse(os.path.isfile(temp_filename)) + # Confirm the file was deleted on failure + self.assertFalse(os.path.isfile(temp_f.name)) - # Now download with checksumming turned off - blob.download_to_filename(temp_filename, checksum=None) + # Now download with checksumming turned off + blob.download_to_filename(temp_f.name, checksum=None) - with open(temp_filename, "rb") as file_obj: - stored_contents = file_obj.read() + with open(temp_f.name, "rb") as file_obj: + stored_contents = file_obj.read() - self.assertEqual(file_contents, stored_contents) + self.assertEqual(file_contents, stored_contents) def test_copy_existing_file(self): filename = self.FILES["logo"]["path"] From 8489549b2eff03f2d82dd7fcf80134e099edcf22 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 25 Aug 2020 11:26:32 -0700 Subject: [PATCH 9/9] update docstring and bump resumable media dependency --- google/cloud/storage/blob.py | 12 ++++++------ setup.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 977f6ef6b..acac4946a 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -1752,8 +1752,8 @@ def _initiate_resumable_upload( the integrity of the object. After the upload is complete, the server-computed checksum of the resulting object will be checked and google.resumable_media.common.DataCorruption will be raised on - a mismatch. On a failure, the client will attempt to delete the - corrupted file from the remote host automatically. Supported values + a mismatch. On a validation failure, the client will attempt to + delete the uploaded object automatically. Supported values are "md5", "crc32c" and None. The default is None. :rtype: tuple @@ -1917,8 +1917,8 @@ def _do_resumable_upload( the integrity of the object. After the upload is complete, the server-computed checksum of the resulting object will be checked and google.resumable_media.common.DataCorruption will be raised on - a mismatch. On a failure, the client will attempt to delete the - corrupted file from the remote host automatically. Supported values + a mismatch. On a validation failure, the client will attempt to + delete the uploaded object automatically. Supported values are "md5", "crc32c" and None. The default is None. :rtype: :class:`~requests.Response` @@ -2525,8 +2525,8 @@ def create_resumable_upload_session( the integrity of the object. After the upload is complete, the server-computed checksum of the resulting object will be checked and google.resumable_media.common.DataCorruption will be raised on - a mismatch. On a failure, the client will attempt to delete the - corrupted file from the remote host automatically. Supported values + a mismatch. On a validation failure, the client will attempt to + delete the uploaded object automatically. Supported values are "md5", "crc32c" and None. The default is None. :rtype: str diff --git a/setup.py b/setup.py index b4362d0d2..ce0ebbecf 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ dependencies = [ "google-auth >= 1.11.0, < 2.0dev", "google-cloud-core >= 1.4.1, < 2.0dev", - "google-resumable-media >= 0.7.0, < 2.0dev", + "google-resumable-media >= 1.0.0, < 2.0dev", ] extras = {}