From 02cb720035f5c31922c7aee7035179dd078cf742 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Fri, 13 Jan 2017 09:47:26 -0500 Subject: [PATCH 1/2] =?UTF-8?q?Fix=20HadoopFileSource=E2=80=99s=20split=20?= =?UTF-8?q?size=20estimate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../contrib/hadoop/HadoopFileSource.java | 8 ++++++- .../contrib/hadoop/HadoopFileSourceTest.java | 23 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java b/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java index e8981d2d6a..c166261fd6 100644 --- a/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java +++ b/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java @@ -239,12 +239,18 @@ private Coder getDefaultCoder(Class c) { public long getEstimatedSizeBytes(PipelineOptions options) { long size = 0; try { + // If this source represents a split from splitIntoBundles, then return the size of the split, + // rather then the entire input + if (serializableSplit != null) { + return serializableSplit.getSplit().getLength(); + } + Job job = Job.getInstance(); // new instance for (FileStatus st : listStatus(createFormat(job), job)) { size += st.getLen(); } } catch (IOException | NoSuchMethodException | InvocationTargetException - | IllegalAccessException | InstantiationException e) { + | IllegalAccessException | InstantiationException | InterruptedException e) { // ignore, and return 0 } return size; diff --git a/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java b/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java index cef3c08348..eac54a1e31 100644 --- a/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java +++ b/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java @@ -152,6 +152,29 @@ public void testSplits() throws Exception { assertTrue(nonEmptySplits > 2); } + @Test + public void testSplitEstimatedSize() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + + List> expectedResults = createRandomRecords(3, 10000, 0); + File file = createFileWithData("tmp.avro", expectedResults); + + HadoopFileSource source = HadoopFileSource.from( + file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class + ); + + long originalSize = source.getEstimatedSizeBytes(options); + long splitTotalSize = 0; + List>> splits = source.splitIntoBundles( + SequenceFile.SYNC_INTERVAL, options + ); + for (BoundedSource> splitSource : splits) { + splitTotalSize += splitSource.getEstimatedSizeBytes(options); + } + // Assert that the estimated size of the whole is the sum of its parts + assertEquals(originalSize, splitTotalSize); + } + private File createFileWithData(String filename, List> records) throws IOException { File tmpFile = tmpFolder.newFile(filename); From 73a1ea6757aabd34f6e8b8480b2ee4a2711f2529 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Thu, 26 Jan 2017 14:29:52 -0500 Subject: [PATCH 2/2] Properly set interrupted state --- .../cloud/dataflow/contrib/hadoop/HadoopFileSource.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java b/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java index c166261fd6..cffc475d71 100644 --- a/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java +++ b/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java @@ -250,7 +250,10 @@ public long getEstimatedSizeBytes(PipelineOptions options) { size += st.getLen(); } } catch (IOException | NoSuchMethodException | InvocationTargetException - | IllegalAccessException | InstantiationException | InterruptedException e) { + | IllegalAccessException | InstantiationException) { + // ignore, and return 0 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // ignore, and return 0 } return size;