Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
import json
import os

import httplib2
import six

from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _microseconds_from_datetime
from google.cloud._helpers import _millis_from_datetime
from google.cloud.exceptions import NotFound
from google.cloud.exceptions import make_exception
from google.cloud.streaming.exceptions import HttpError
from google.cloud.streaming.http_wrapper import Request
from google.cloud.streaming.http_wrapper import make_api_request
from google.cloud.streaming.transfer import RESUMABLE_UPLOAD
Expand Down Expand Up @@ -776,6 +779,16 @@ def insert_data(self,

return errors

@staticmethod

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

def _check_response_error(request, http_response):
"""Helper for :meth:`upload_from_file`."""
info = http_response.info
status = int(info['status'])
if not 200 <= status < 300:
faux_response = httplib2.Response({'status': status})
raise make_exception(faux_response, http_response.content,
error_info=request.url)

# pylint: disable=too-many-arguments,too-many-locals
def upload_from_file(self,
file_obj,
Expand Down Expand Up @@ -947,13 +960,21 @@ def upload_from_file(self,
request.url = connection.build_api_url(api_base_url=base_url,
path=path,
query_params=query_params)
upload.initialize_upload(request, connection.http)
try:
upload.initialize_upload(request, connection.http)
except HttpError as err_response:
faux_response = httplib2.Response(err_response.response)
raise make_exception(faux_response, err_response.content,
error_info=request.url)

if upload.strategy == RESUMABLE_UPLOAD:
http_response = upload.stream_file(use_chunks=True)
else:
http_response = make_api_request(connection.http, request,
retries=num_retries)

self._check_response_error(request, http_response)

response_content = http_response.content
if not isinstance(response_content,
six.string_types): # pragma: NO COVER Python3
Expand Down
69 changes: 69 additions & 0 deletions unit_tests/bigquery/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,35 @@ def test_upload_from_file_size_failure(self):
with self.assertRaises(ValueError):
table.upload_from_file(file_obj, 'CSV', size=None)

def test_upload_from_file_multipart_w_400(self):
import csv
import datetime
from six.moves.http_client import BAD_REQUEST
from unit_tests._testing import _NamedTemporaryFile
from google.cloud._helpers import UTC
from google.cloud.exceptions import BadRequest
WHEN_TS = 1437767599.006
WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace(
tzinfo=UTC)
response = {'status': BAD_REQUEST}
conn = _Connection(
(response, b'{}'),
)
client = _Client(project=self.PROJECT, connection=conn)
dataset = _Dataset(client)
table = self._makeOne(self.TABLE_NAME, dataset=dataset)

with _NamedTemporaryFile() as temp:
with open(temp.name, 'w') as file_obj:
writer = csv.writer(file_obj)
writer.writerow(('full_name', 'age', 'joined'))
writer.writerow(('Phred Phlyntstone', 32, WHEN))

with open(temp.name, 'rb') as file_obj:
with self.assertRaises(BadRequest):
table.upload_from_file(
file_obj, 'CSV', rewind=True)

def _upload_from_file_helper(self, **kw):
import csv
import datetime
Expand Down Expand Up @@ -1564,6 +1593,46 @@ def test_upload_from_file_w_bound_client_multipart(self):
payload_lines = app_msg._payload.rstrip().splitlines()
self.assertEqual(payload_lines, body_lines)

def test_upload_from_file_resumable_with_400(self):
import csv
import datetime
from six.moves.http_client import BAD_REQUEST
from google.cloud.bigquery import table as MUT
from google.cloud.exceptions import BadRequest
from google.cloud._helpers import UTC
from unit_tests._testing import _Monkey
from unit_tests._testing import _NamedTemporaryFile
WHEN_TS = 1437767599.006
WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace(
tzinfo=UTC)
initial_response = {'status': BAD_REQUEST}
conn = _Connection(
(initial_response, b'{}'),
)
client = _Client(project=self.PROJECT, connection=conn)

class _UploadConfig(object):
accept = ['*/*']
max_size = None
resumable_multipart = True
resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs'
simple_multipart = True
simple_path = u'' # force resumable
dataset = _Dataset(client)
table = self._makeOne(self.TABLE_NAME, dataset=dataset)

with _Monkey(MUT, _UploadConfig=_UploadConfig):
with _NamedTemporaryFile() as temp:
with open(temp.name, 'w') as file_obj:
writer = csv.writer(file_obj)
writer.writerow(('full_name', 'age', 'joined'))
writer.writerow(('Phred Phlyntstone', 32, WHEN))

with open(temp.name, 'rb') as file_obj:
with self.assertRaises(BadRequest):
table.upload_from_file(
file_obj, 'CSV', rewind=True)

# pylint: disable=too-many-statements
def test_upload_from_file_w_explicit_client_resumable(self):
import json
Expand Down