Skip to content
This repository was archived by the owner on Mar 2, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ def system(session):
system_test_path = os.path.join("tests", "system.py")
system_test_folder_path = os.path.join("tests", "system")
# Sanity check: Only run tests if the environment variable is set.
if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", ""):
session.skip("Credentials must be set via environment variable")
is_emulator = 'DATASTORE_DATASET' in os.environ
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
is_emulator = 'DATASTORE_DATASET' in os.environ
is_emulator = "DATASTORE_DATASET" in os.environ

has_credentials = bool(os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", ""))
if not is_emulator and not has_credentials:
session.skip("Credentials must be set via environment variable for non-emulator tests")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
session.skip("Credentials must be set via environment variable for non-emulator tests")
session.skip(
"Credentials must be set via environment variable for non-emulator tests"
)


system_test_exists = os.path.exists(system_test_path)
system_test_folder_exists = os.path.exists(system_test_folder_path)
Expand Down
12 changes: 7 additions & 5 deletions tests/system/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import requests
import six

import google.auth.credentials
from google.cloud._helpers import UTC
from google.cloud import datastore
from google.cloud.datastore.helpers import GeoPoint
from google.cloud.environment_vars import GCD_DATASET
from google.cloud.exceptions import Conflict

from test_utils.system import EmulatorCreds
from test_utils.system import unique_resource_id

from tests.system.utils import clear_datastore
Expand Down Expand Up @@ -59,7 +59,7 @@ def setUpModule():
if emulator_dataset is None:
Config.CLIENT = datastore.Client(namespace=test_namespace)
else:
credentials = EmulatorCreds()
credentials = google.auth.credentials.AnonymousCredentials()
http = requests.Session() # Un-authorized.
Config.CLIENT = datastore.Client(
project=emulator_dataset,
Expand Down Expand Up @@ -243,6 +243,8 @@ def setUpClass(cls):
if os.getenv(GCD_DATASET) is not None:
# Populate the datastore with the cloned client.
populate_datastore.add_characters(client=cls.CLIENT)
populate_datastore.add_uid_keys(client=cls.CLIENT)
populate_datastore.add_timestamp_keys(client=cls.CLIENT)

cls.CHARACTERS = populate_datastore.CHARACTERS
# Use the client for this test instead of the global.
Expand Down Expand Up @@ -374,7 +376,7 @@ def test_query_paginate_simple_uuid_keys(self):
self.assertNotIn(uuid_str, seen, uuid_str)
seen.add(uuid_str)

self.assertTrue(page_count > 1)
self.assertGreater(page_count, 1)

def test_query_paginate_simple_timestamp_keys(self):

Expand All @@ -391,7 +393,7 @@ def test_query_paginate_simple_timestamp_keys(self):
self.assertNotIn(timestamp, seen, timestamp)
seen.add(timestamp)

self.assertTrue(page_count > 1)
self.assertGreater(page_count, 1)

def test_query_offset_timestamp_keys(self):
# See issue #4675
Expand Down Expand Up @@ -495,6 +497,7 @@ def _base_query(self):
def _verify(self, limit, offset, expected):
# Query used for all tests
page_query = self._base_query()
page_query.keys_only() # Work around bug in the datastore emulator where it fails to take into account the size of the response (and fails due to a grpc response too large error).
page_query.add_filter("family", "=", "Stark")
page_query.add_filter("alive", "=", False)

Expand Down Expand Up @@ -601,7 +604,6 @@ def test_empty_array_put(self):
local_client = clone_client(Config.CLIENT)

key = local_client.key("EmptyArray", 1234)
local_client = datastore.Client()
entity = datastore.Entity(key=key)
entity["children"] = []
local_client.put(entity)
Expand Down
16 changes: 15 additions & 1 deletion tests/system/utils/clear_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,24 @@ def remove_kind(kind, client):


def remove_all_entities(client):
BATCH_SIZE = 500 # Datastore API only allows 500 mutations in a single call.
KEY_BYTES_LIMIT = 3 << 20 # grpc limit is ~4MiB, so use a 3MiB limit (to work around any encoding issues)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
KEY_BYTES_LIMIT = 3 << 20 # grpc limit is ~4MiB, so use a 3MiB limit (to work around any encoding issues)
KEY_BYTES_LIMIT = (
3 << 20
) # grpc limit is ~4MiB, so use a 3MiB limit (to work around any encoding issues)


query = client.query()
query.keys_only()
results = list(query.fetch())
keys = [entity.key for entity in results]
client.delete_multi(keys)
while keys:
key_bytes = 0
batch = []
# Grab keys to delete, while ensuring we stay within our bounds.
while len(batch) < BATCH_SIZE and key_bytes < KEY_BYTES_LIMIT and keys:
batch.append(keys.pop())
if batch[-1].name is None:
key_bytes += 9 # It takes 9 bytes for the largest varint encoded number
else:
key_bytes += len(batch[-1].name)
client.delete_multi(batch)


def main():
Expand Down
104 changes: 67 additions & 37 deletions tests/system/utils/populate_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,55 +63,84 @@ def print_func(message):
print(message)


def _estimate_entity_size(entity):
def _estimate_value_size(value):
if isinstance(value, six.integer_types):
return 9 # Max varint size
elif isinstance(value, float):
return 8
elif isinstance(value, six.string_types) or isinstance(value, six.binary_type):
return len(value)
elif isinstance(value, (list, tuple)):
return sum(_estimate_entity_size(elem) for elem in value)
elif isinstance(value, dict):
return _estimate_entity_size(value)
result = 0
for key, value in entity.items():
result += len(key) # The number of runes is fine, no point forcing a utf-8 encoding here.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
result += len(key) # The number of runes is fine, no point forcing a utf-8 encoding here.
result += len(
key
) # The number of runes is fine, no point forcing a utf-8 encoding here.

result += _estimate_value_size(value)
return result

def add_large_character_entities(client=None):
TOTAL_OBJECTS = 2500
NAMESPACE = "LargeCharacterEntity"
KIND = "LargeCharacter"
MAX_STRING = (string.ascii_lowercase * 58)[:1500]

BATCH_SIZE = 500 # Datastore API only allows 500 mutations in a single call.
RPC_BYTES_LIMIT = 3 << 20 # grpc limit is ~4MiB, so use a 3MiB limit (to work around any encoding issues)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RPC_BYTES_LIMIT = 3 << 20 # grpc limit is ~4MiB, so use a 3MiB limit (to work around any encoding issues)
RPC_BYTES_LIMIT = (
3 << 20
) # grpc limit is ~4MiB, so use a 3MiB limit (to work around any encoding issues)


client.namespace = NAMESPACE

# Query used for all tests
page_query = client.query(kind=KIND, namespace=NAMESPACE)
page_query.keys_only()

def put_objects(count):
current = 0

# Can only do 500 operations in a transaction with an overall
# size limit.
ENTITIES_TO_BATCH = 25
while current < count:
start = current
end = min(current + ENTITIES_TO_BATCH, count)
with client.transaction() as xact:
# The name/ID for the new entity
for i in range(start, end):
name = "character{0:05d}".format(i)
# The Cloud Datastore key for the new entity
task_key = client.key(KIND, name)

# Prepares the new entity
task = datastore.Entity(key=task_key)
task["name"] = "{0:05d}".format(i)
task["family"] = "Stark"
task["alive"] = False

for i in string.ascii_lowercase:
task["space-{}".format(i)] = MAX_STRING

# Saves the entity
xact.put(task)
current += ENTITIES_TO_BATCH

# Ensure we have 1500 entities for tests. If not, clean up type and add
remaining = count
entities = []
# The name/ID for the new entity
for i in range(count):
name = "character{0:05d}".format(i)
# The Cloud Datastore key for the new entity
task_key = client.key(KIND, name)

# Prepares the new entity
task = datastore.Entity(key=task_key)
task["name"] = "{0:05d}".format(i)
task["family"] = "Stark"
task["alive"] = False
for i in string.ascii_lowercase:
task["space-{}".format(i)] = MAX_STRING
entities.append(task)

# Now lets try to insert all of the entities, in batches.
while entities:
approx_rpc_bytes = 0
batch = []
while entities and len(batch) < BATCH_SIZE and approx_rpc_bytes < RPC_BYTES_LIMIT:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while entities and len(batch) < BATCH_SIZE and approx_rpc_bytes < RPC_BYTES_LIMIT:
while (
entities
and len(batch) < BATCH_SIZE
and approx_rpc_bytes < RPC_BYTES_LIMIT
):

batch.append(entities.pop())
approx_rpc_bytes += _estimate_entity_size(batch[-1])
# These entities are all in different entity groups, so there is no
# benefit in placing them in a transaction.
client.put_multi(batch)

# Ensure we have 2500 entities for tests. If not, clean up type and add
# new entities equal to TOTAL_OBJECTS
all_entities = [e for e in page_query.fetch()]
if len(all_entities) != TOTAL_OBJECTS:
# Cleanup Collection if not an exact match
while all_entities:
entities = all_entities[:500]
all_entities = all_entities[500:]
client.delete_multi([e.key for e in entities])
all_keys = [e.key for e in page_query.fetch()]
if len(all_keys) != TOTAL_OBJECTS:
# Remove all of the entites that exist of this kind in this namespace.
while all_keys:
key_bytes = 0
batch = []
# Grab keys to delete, while ensuring we stay within our bounds.
while len(batch) < BATCH_SIZE and key_bytes < RPC_BYTES_LIMIT and all_keys:
batch.append(all_keys.pop())
if batch[-1].name is None:
key_bytes += 9 # It takes 9 bytes for the largest varint encoded number
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
key_bytes += 9 # It takes 9 bytes for the largest varint encoded number
key_bytes += (
9 # It takes 9 bytes for the largest varint encoded number
)

else:
key_bytes += len(batch[-1].name)
client.delete_multi(batch)
# Put objects
put_objects(TOTAL_OBJECTS)

Expand Down Expand Up @@ -156,8 +185,9 @@ def add_timestamp_keys(client=None):
# Get a client that uses the test dataset.
client = datastore.Client()

num_batches = 2
num_batches = 21
batch_size = 500
assert num_batches * batch_size > 10000, 'test_query_offset_timestamp_keys requires at least 10k entries, otherwise it fails'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert num_batches * batch_size > 10000, 'test_query_offset_timestamp_keys requires at least 10k entries, otherwise it fails'
assert (
num_batches * batch_size > 10000
), "test_query_offset_timestamp_keys requires at least 10k entries, otherwise it fails"


timestamp_micros = set()
for batch_num in range(num_batches):
Expand Down