diff --git a/noxfile.py b/noxfile.py index 187124ab..782ff30a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -101,8 +101,10 @@ def system(session): system_test_path = os.path.join("tests", "system.py") system_test_folder_path = os.path.join("tests", "system") # Sanity check: Only run tests if the environment variable is set. - if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", ""): - session.skip("Credentials must be set via environment variable") + is_emulator = 'DATASTORE_DATASET' in os.environ + has_credentials = bool(os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", "")) + if not is_emulator and not has_credentials: + session.skip("Credentials must be set via environment variable for non-emulator tests") system_test_exists = os.path.exists(system_test_path) system_test_folder_exists = os.path.exists(system_test_folder_path) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 577bd748..a1a49304 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -19,13 +19,13 @@ import requests import six +import google.auth.credentials from google.cloud._helpers import UTC from google.cloud import datastore from google.cloud.datastore.helpers import GeoPoint from google.cloud.environment_vars import GCD_DATASET from google.cloud.exceptions import Conflict -from test_utils.system import EmulatorCreds from test_utils.system import unique_resource_id from tests.system.utils import clear_datastore @@ -59,7 +59,7 @@ def setUpModule(): if emulator_dataset is None: Config.CLIENT = datastore.Client(namespace=test_namespace) else: - credentials = EmulatorCreds() + credentials = google.auth.credentials.AnonymousCredentials() http = requests.Session() # Un-authorized. Config.CLIENT = datastore.Client( project=emulator_dataset, @@ -243,6 +243,8 @@ def setUpClass(cls): if os.getenv(GCD_DATASET) is not None: # Populate the datastore with the cloned client. populate_datastore.add_characters(client=cls.CLIENT) + populate_datastore.add_uid_keys(client=cls.CLIENT) + populate_datastore.add_timestamp_keys(client=cls.CLIENT) cls.CHARACTERS = populate_datastore.CHARACTERS # Use the client for this test instead of the global. @@ -374,7 +376,7 @@ def test_query_paginate_simple_uuid_keys(self): self.assertNotIn(uuid_str, seen, uuid_str) seen.add(uuid_str) - self.assertTrue(page_count > 1) + self.assertGreater(page_count, 1) def test_query_paginate_simple_timestamp_keys(self): @@ -391,7 +393,7 @@ def test_query_paginate_simple_timestamp_keys(self): self.assertNotIn(timestamp, seen, timestamp) seen.add(timestamp) - self.assertTrue(page_count > 1) + self.assertGreater(page_count, 1) def test_query_offset_timestamp_keys(self): # See issue #4675 @@ -495,6 +497,7 @@ def _base_query(self): def _verify(self, limit, offset, expected): # Query used for all tests page_query = self._base_query() + page_query.keys_only() # Work around bug in the datastore emulator where it fails to take into account the size of the response (and fails due to a grpc response too large error). page_query.add_filter("family", "=", "Stark") page_query.add_filter("alive", "=", False) @@ -601,7 +604,6 @@ def test_empty_array_put(self): local_client = clone_client(Config.CLIENT) key = local_client.key("EmptyArray", 1234) - local_client = datastore.Client() entity = datastore.Entity(key=key) entity["children"] = [] local_client.put(entity) diff --git a/tests/system/utils/clear_datastore.py b/tests/system/utils/clear_datastore.py index 3438ff89..2ae94d92 100644 --- a/tests/system/utils/clear_datastore.py +++ b/tests/system/utils/clear_datastore.py @@ -81,10 +81,24 @@ def remove_kind(kind, client): def remove_all_entities(client): + BATCH_SIZE = 500 # Datastore API only allows 500 mutations in a single call. + KEY_BYTES_LIMIT = 3 << 20 # grpc limit is ~4MiB, so use a 3MiB limit (to work around any encoding issues) + query = client.query() + query.keys_only() results = list(query.fetch()) keys = [entity.key for entity in results] - client.delete_multi(keys) + while keys: + key_bytes = 0 + batch = [] + # Grab keys to delete, while ensuring we stay within our bounds. + while len(batch) < BATCH_SIZE and key_bytes < KEY_BYTES_LIMIT and keys: + batch.append(keys.pop()) + if batch[-1].name is None: + key_bytes += 9 # It takes 9 bytes for the largest varint encoded number + else: + key_bytes += len(batch[-1].name) + client.delete_multi(batch) def main(): diff --git a/tests/system/utils/populate_datastore.py b/tests/system/utils/populate_datastore.py index e8e1574a..d1921e43 100644 --- a/tests/system/utils/populate_datastore.py +++ b/tests/system/utils/populate_datastore.py @@ -63,55 +63,84 @@ def print_func(message): print(message) +def _estimate_entity_size(entity): + def _estimate_value_size(value): + if isinstance(value, six.integer_types): + return 9 # Max varint size + elif isinstance(value, float): + return 8 + elif isinstance(value, six.string_types) or isinstance(value, six.binary_type): + return len(value) + elif isinstance(value, (list, tuple)): + return sum(_estimate_entity_size(elem) for elem in value) + elif isinstance(value, dict): + return _estimate_entity_size(value) + result = 0 + for key, value in entity.items(): + result += len(key) # The number of runes is fine, no point forcing a utf-8 encoding here. + result += _estimate_value_size(value) + return result + def add_large_character_entities(client=None): TOTAL_OBJECTS = 2500 NAMESPACE = "LargeCharacterEntity" KIND = "LargeCharacter" MAX_STRING = (string.ascii_lowercase * 58)[:1500] + BATCH_SIZE = 500 # Datastore API only allows 500 mutations in a single call. + RPC_BYTES_LIMIT = 3 << 20 # grpc limit is ~4MiB, so use a 3MiB limit (to work around any encoding issues) + client.namespace = NAMESPACE # Query used for all tests page_query = client.query(kind=KIND, namespace=NAMESPACE) + page_query.keys_only() def put_objects(count): - current = 0 - - # Can only do 500 operations in a transaction with an overall - # size limit. - ENTITIES_TO_BATCH = 25 - while current < count: - start = current - end = min(current + ENTITIES_TO_BATCH, count) - with client.transaction() as xact: - # The name/ID for the new entity - for i in range(start, end): - name = "character{0:05d}".format(i) - # The Cloud Datastore key for the new entity - task_key = client.key(KIND, name) - - # Prepares the new entity - task = datastore.Entity(key=task_key) - task["name"] = "{0:05d}".format(i) - task["family"] = "Stark" - task["alive"] = False - - for i in string.ascii_lowercase: - task["space-{}".format(i)] = MAX_STRING - - # Saves the entity - xact.put(task) - current += ENTITIES_TO_BATCH - - # Ensure we have 1500 entities for tests. If not, clean up type and add + remaining = count + entities = [] + # The name/ID for the new entity + for i in range(count): + name = "character{0:05d}".format(i) + # The Cloud Datastore key for the new entity + task_key = client.key(KIND, name) + + # Prepares the new entity + task = datastore.Entity(key=task_key) + task["name"] = "{0:05d}".format(i) + task["family"] = "Stark" + task["alive"] = False + for i in string.ascii_lowercase: + task["space-{}".format(i)] = MAX_STRING + entities.append(task) + + # Now lets try to insert all of the entities, in batches. + while entities: + approx_rpc_bytes = 0 + batch = [] + while entities and len(batch) < BATCH_SIZE and approx_rpc_bytes < RPC_BYTES_LIMIT: + batch.append(entities.pop()) + approx_rpc_bytes += _estimate_entity_size(batch[-1]) + # These entities are all in different entity groups, so there is no + # benefit in placing them in a transaction. + client.put_multi(batch) + + # Ensure we have 2500 entities for tests. If not, clean up type and add # new entities equal to TOTAL_OBJECTS - all_entities = [e for e in page_query.fetch()] - if len(all_entities) != TOTAL_OBJECTS: - # Cleanup Collection if not an exact match - while all_entities: - entities = all_entities[:500] - all_entities = all_entities[500:] - client.delete_multi([e.key for e in entities]) + all_keys = [e.key for e in page_query.fetch()] + if len(all_keys) != TOTAL_OBJECTS: + # Remove all of the entites that exist of this kind in this namespace. + while all_keys: + key_bytes = 0 + batch = [] + # Grab keys to delete, while ensuring we stay within our bounds. + while len(batch) < BATCH_SIZE and key_bytes < RPC_BYTES_LIMIT and all_keys: + batch.append(all_keys.pop()) + if batch[-1].name is None: + key_bytes += 9 # It takes 9 bytes for the largest varint encoded number + else: + key_bytes += len(batch[-1].name) + client.delete_multi(batch) # Put objects put_objects(TOTAL_OBJECTS) @@ -156,8 +185,9 @@ def add_timestamp_keys(client=None): # Get a client that uses the test dataset. client = datastore.Client() - num_batches = 2 + num_batches = 21 batch_size = 500 + assert num_batches * batch_size > 10000, 'test_query_offset_timestamp_keys requires at least 10k entries, otherwise it fails' timestamp_micros = set() for batch_num in range(num_batches):