|
1 | 1 | package stroom.proxy.app.handler;
|
2 | 2 |
|
| 3 | +import stroom.meta.api.AttributeMap; |
| 4 | +import stroom.meta.api.AttributeMapUtil; |
3 | 5 | import stroom.proxy.app.DataDirProvider;
|
4 | 6 | import stroom.util.io.FileName;
|
5 | 7 | import stroom.util.io.FileUtil;
|
|
17 | 19 | import java.nio.file.Files;
|
18 | 20 | import java.nio.file.Path;
|
19 | 21 | import java.util.Iterator;
|
| 22 | +import java.util.Map; |
| 23 | +import java.util.Objects; |
20 | 24 | import java.util.concurrent.atomic.AtomicBoolean;
|
21 | 25 | import java.util.concurrent.atomic.AtomicLong;
|
22 | 26 | import java.util.function.Consumer;
|
@@ -76,23 +80,38 @@ public void addDir(final Path dir) {
|
76 | 80 | final Path tempDir = tempAggregatesDirProvider.get();
|
77 | 81 | final FileGroup outputFileGroup = new FileGroup(tempDir);
|
78 | 82 | final AtomicLong count = new AtomicLong();
|
79 |
| - final AtomicBoolean doneMeta = new AtomicBoolean(); |
| 83 | + final AttributeMap commonHeaders = new AttributeMap(); |
| 84 | + final AtomicBoolean doneFirstMeta = new AtomicBoolean(); |
80 | 85 | // Get a buffer to help us transfer data.
|
81 | 86 | final byte[] buffer = LocalByteBuffer.get();
|
82 | 87 |
|
83 | 88 | try (final ProxyZipWriter zipWriter = new ProxyZipWriter(outputFileGroup.getZip(), buffer)) {
|
84 | 89 | FileUtil.forEachChild(dir, fileGroupDir -> {
|
85 | 90 | final FileGroup fileGroup = new FileGroup(fileGroupDir);
|
86 | 91 |
|
87 |
| - // Output meta if this is the first. |
88 |
| - if (!doneMeta.get()) { |
89 |
| - try { |
90 |
| - Files.copy(fileGroup.getMeta(), outputFileGroup.getMeta()); |
91 |
| - doneMeta.set(true); |
92 |
| - } catch (final IOException e) { |
93 |
| - LOGGER.error(e::getMessage, e); |
94 |
| - throw new UncheckedIOException(e); |
| 92 | + // Combine common header keys and values. |
| 93 | + try { |
| 94 | + if (doneFirstMeta.compareAndSet(false, true)) { |
| 95 | + // Load initial common headers from the first meta. |
| 96 | + AttributeMapUtil.read(fileGroup.getMeta(), commonHeaders); |
| 97 | + } else { |
| 98 | + // Remove headers that don't exist or are different in subsequent meta files. |
| 99 | + final AttributeMap headers = new AttributeMap(); |
| 100 | + AttributeMapUtil.read(fileGroup.getMeta(), headers); |
| 101 | + final Iterator<Map.Entry<String, String>> iterator = |
| 102 | + commonHeaders.entrySet().iterator(); |
| 103 | + while (iterator.hasNext()) { |
| 104 | + final Map.Entry<String, String> entry = iterator.next(); |
| 105 | + final String otherValue = headers.get(entry.getKey()); |
| 106 | + // If this header is different then remove the common header. |
| 107 | + if (!Objects.equals(entry.getValue(), otherValue)) { |
| 108 | + iterator.remove(); |
| 109 | + } |
| 110 | + } |
95 | 111 | }
|
| 112 | + } catch (final IOException e) { |
| 113 | + LOGGER.error(e::getMessage, e); |
| 114 | + throw new UncheckedIOException(e); |
96 | 115 | }
|
97 | 116 |
|
98 | 117 | try (final ZipFile zipFile = ZipUtil.createZipFile(fileGroup.getZip())) {
|
@@ -122,6 +141,14 @@ public void addDir(final Path dir) {
|
122 | 141 | throw new UncheckedIOException(e);
|
123 | 142 | }
|
124 | 143 | });
|
| 144 | + |
| 145 | + // Now write the common header arguments to the aggregate meta file. |
| 146 | + try { |
| 147 | + AttributeMapUtil.write(commonHeaders, outputFileGroup.getMeta()); |
| 148 | + } catch (final IOException e) { |
| 149 | + LOGGER.error(e::getMessage, e); |
| 150 | + throw new UncheckedIOException(e); |
| 151 | + } |
125 | 152 | }
|
126 | 153 |
|
127 | 154 | // We have finished the merge so transfer the new item to be forwarded.
|
|
0 commit comments