From d1025fd634e79f2f384131ca2776f346aa446902 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Mon, 30 Oct 2023 18:46:31 +0000 Subject: [PATCH 1/7] [NUTCH-3017] Allow fast-urlfilter to load from HDFS/S3 and support gzipped input Signed-off-by: Julien Nioche --- .../nutch/urlfilter/fast/FastURLFilter.java | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java b/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java index ffcd0138a..79ad7b6ca 100644 --- a/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java +++ b/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java @@ -20,6 +20,8 @@ import com.google.common.collect.Multimap; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; import org.apache.nutch.net.URLFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,10 +29,13 @@ import java.lang.invoke.MethodHandles; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.Reader; import java.net.URL; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; +import java.util.zip.GZIPInputStream; /** * Filters URLs based on a file of regular expressions using host/domains @@ -181,9 +186,23 @@ public String filter(String url) { public void reloadRules() throws IOException { String fileRules = conf.get(URLFILTER_FAST_FILE); - try (Reader reader = conf.getConfResourceAsReader(fileRules)) { - reloadRules(reader); + + InputStream is; + + Path fileRulesPath = new Path(fileRules); + if (fileRulesPath.toUri().getScheme() != null) { + FileSystem fs = fileRulesPath.getFileSystem(conf); + is = fs.open(fileRulesPath); + } + else { + is = conf.getConfResourceAsInputStream(fileRules); } + + if (fileRules.endsWith(".gz")) { + is = new GZIPInputStream(is); + } + + reloadRules(new InputStreamReader(is)); } private void reloadRules(Reader rules) throws IOException { From bbf0867263ed1764c56fe7794c17942d0e8bf1c4 Mon Sep 17 00:00:00 2001 From: Lewis John McGibbney Date: Thu, 2 Nov 2023 20:36:43 -0700 Subject: [PATCH 2/7] NUTCH-3014 Standardize Job names (#789) --- src/java/org/apache/nutch/crawl/CrawlDb.java | 3 +- .../org/apache/nutch/crawl/CrawlDbMerger.java | 3 +- .../org/apache/nutch/crawl/CrawlDbReader.java | 20 ++++------- .../apache/nutch/crawl/DeduplicationJob.java | 3 +- .../org/apache/nutch/crawl/Generator.java | 13 +++---- src/java/org/apache/nutch/crawl/Injector.java | 2 +- src/java/org/apache/nutch/crawl/LinkDb.java | 3 +- .../org/apache/nutch/crawl/LinkDbMerger.java | 3 +- .../org/apache/nutch/crawl/LinkDbReader.java | 3 +- .../org/apache/nutch/fetcher/Fetcher.java | 2 +- .../org/apache/nutch/hostdb/ReadHostDb.java | 3 +- .../org/apache/nutch/hostdb/UpdateHostDb.java | 3 +- .../org/apache/nutch/indexer/CleaningJob.java | 4 +-- .../org/apache/nutch/indexer/IndexingJob.java | 3 +- .../org/apache/nutch/parse/ParseSegment.java | 3 +- .../nutch/scoring/webgraph/LinkDumper.java | 6 ++-- .../nutch/scoring/webgraph/LinkRank.java | 15 ++++---- .../nutch/scoring/webgraph/NodeDumper.java | 3 +- .../nutch/scoring/webgraph/ScoreUpdater.java | 3 +- .../nutch/scoring/webgraph/WebGraph.java | 9 ++--- .../apache/nutch/segment/SegmentMerger.java | 3 +- .../apache/nutch/segment/SegmentReader.java | 3 +- .../org/apache/nutch/tools/FreeGenerator.java | 2 +- .../nutch/tools/arc/ArcSegmentCreator.java | 9 ++--- .../apache/nutch/tools/warc/WARCExporter.java | 3 +- .../nutch/util/CrawlCompletionStats.java | 6 ++-- src/java/org/apache/nutch/util/NutchJob.java | 4 --- .../nutch/util/ProtocolStatusStatistics.java | 2 +- .../apache/nutch/util/SitemapProcessor.java | 34 +++++++++---------- .../nutch/util/domain/DomainStatistics.java | 10 +++--- .../apache/nutch/crawl/TestCrawlDbFilter.java | 3 +- .../apache/nutch/plugin/TestPluginSystem.java | 5 ++- 32 files changed, 74 insertions(+), 117 deletions(-) diff --git a/src/java/org/apache/nutch/crawl/CrawlDb.java b/src/java/org/apache/nutch/crawl/CrawlDb.java index 16394832b..2b609c0a6 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDb.java +++ b/src/java/org/apache/nutch/crawl/CrawlDb.java @@ -165,8 +165,7 @@ public static Job createJob(Configuration config, Path crawlDb) Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random() .nextInt(Integer.MAX_VALUE))); - Job job = NutchJob.getInstance(config); - job.setJobName("crawldb " + crawlDb); + Job job = Job.getInstance(config, "Nutch CrawlDb: " + crawlDb); Path current = new Path(crawlDb, CURRENT_NAME); if (current.getFileSystem(job.getConfiguration()).exists(current)) { diff --git a/src/java/org/apache/nutch/crawl/CrawlDbMerger.java b/src/java/org/apache/nutch/crawl/CrawlDbMerger.java index 1bf7243d3..6ee4b43cd 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDbMerger.java +++ b/src/java/org/apache/nutch/crawl/CrawlDbMerger.java @@ -165,9 +165,8 @@ public static Job createMergeJob(Configuration conf, Path output, Path newCrawlDb = new Path(output, "merge-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - Job job = NutchJob.getInstance(conf); + Job job = Job.getInstance(conf, "Nutch CrawlDbMerger: " + output); conf = job.getConfiguration(); - job.setJobName("crawldb merge " + output); job.setInputFormatClass(SequenceFileInputFormat.class); diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java index bd3e6f38d..29e8efe17 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java +++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java @@ -564,9 +564,8 @@ private TreeMap processStatJobHelper(String crawlDb, throws IOException, InterruptedException, ClassNotFoundException { Path tmpFolder = new Path(crawlDb, "stat_tmp" + System.currentTimeMillis()); - Job job = NutchJob.getInstance(config); + Job job = Job.getInstance(config, "Nutch CrawlDbReader: " + crawlDb); config = job.getConfiguration(); - job.setJobName("stats " + crawlDb); config.setBoolean("db.reader.stats.sort", sort); FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); @@ -812,7 +811,7 @@ public CrawlDatum get(String crawlDb, String url, Configuration config) @Override protected int process(String line, StringBuilder output) throws Exception { - Job job = NutchJob.getInstance(getConf()); + Job job = Job.getInstance(getConf(), "Nutch CrawlDbReader: process " + crawlDb); Configuration config = job.getConfiguration(); readUrl(this.crawlDb, line, config, output); return 0; @@ -839,8 +838,7 @@ public void processDumpJob(String crawlDb, String output, Path outFolder = new Path(output); - Job job = NutchJob.getInstance(config); - job.setJobName("dump " + crawlDb); + Job job = Job.getInstance(config, "Nutch CrawlDbReader: dump " + crawlDb); Configuration jobConf = job.getConfiguration(); FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); @@ -958,18 +956,15 @@ public void processTopNJob(String crawlDb, long topN, float min, String output, Configuration config) throws IOException, ClassNotFoundException, InterruptedException { - if (LOG.isInfoEnabled()) { - LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")"); - LOG.info("CrawlDb db: {}", crawlDb); - } + LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")"); + LOG.info("CrawlDb db: {}", crawlDb); Path outFolder = new Path(output); Path tempDir = new Path( config.get("mapreduce.cluster.temp.dir", ".") + "/readdb-topN-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - Job job = NutchJob.getInstance(config); - job.setJobName("topN prepare " + crawlDb); + Job job = Job.getInstance(config, "Nutch CrawlDbReader: topN prepare " + crawlDb); FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); job.setInputFormatClass(SequenceFileInputFormat.class); @@ -1000,8 +995,7 @@ public void processTopNJob(String crawlDb, long topN, float min, } LOG.info("CrawlDb topN: collecting topN scores."); - job = NutchJob.getInstance(config); - job.setJobName("topN collect " + crawlDb); + job = Job.getInstance(config, "Nutch CrawlDbReader: topN collect " + crawlDb); job.getConfiguration().setLong("db.reader.topn", topN); FileInputFormat.addInputPath(job, tempDir); diff --git a/src/java/org/apache/nutch/crawl/DeduplicationJob.java b/src/java/org/apache/nutch/crawl/DeduplicationJob.java index 217005d41..e37001354 100644 --- a/src/java/org/apache/nutch/crawl/DeduplicationJob.java +++ b/src/java/org/apache/nutch/crawl/DeduplicationJob.java @@ -305,9 +305,8 @@ public int run(String[] args) throws IOException { Path tempDir = new Path(crawlDb, "dedup-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - Job job = NutchJob.getInstance(getConf()); + Job job = Job.getInstance(getConf(), "Nutch DeduplicationJob: " + crawlDb); Configuration conf = job.getConfiguration(); - job.setJobName("Deduplication on " + crawlDb); conf.set(DEDUPLICATION_GROUP_MODE, group); conf.set(DEDUPLICATION_COMPARE_ORDER, compareOrder); job.setJarByClass(DeduplicationJob.class); diff --git a/src/java/org/apache/nutch/crawl/Generator.java b/src/java/org/apache/nutch/crawl/Generator.java index 1b62314e7..33f743a37 100644 --- a/src/java/org/apache/nutch/crawl/Generator.java +++ b/src/java/org/apache/nutch/crawl/Generator.java @@ -388,7 +388,7 @@ private JexlContext createContext(HostDatum datum) { public void setup(Context context) throws IOException { conf = context.getConfiguration(); mos = new MultipleOutputs(context); - Job job = Job.getInstance(conf); + Job job = Job.getInstance(conf, "Nutch Generator.SelectorReducer"); limit = conf.getLong(GENERATOR_TOP_N, Long.MAX_VALUE) / job.getNumReduceTasks(); maxNumSegments = conf.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1); @@ -695,7 +695,7 @@ public Path[] generate(Path dbDir, Path segments, int numLists, long topN, long curTime) throws IOException, InterruptedException, ClassNotFoundException { - Job job = NutchJob.getInstance(getConf()); + Job job = Job.getInstance(getConf(), "Nutch Generator: generate from " + dbDir); Configuration conf = job.getConfiguration(); boolean filter = conf.getBoolean(GENERATOR_FILTER, true); boolean normalise = conf.getBoolean(GENERATOR_NORMALISE, true); @@ -839,8 +839,7 @@ public Path[] generate(Path dbDir, Path segments, int numLists, long topN, } // map to inverted subset due for fetch, sort by score - Job job = NutchJob.getInstance(getConf()); - job.setJobName("generate: select from " + dbDir); + Job job = Job.getInstance(getConf(), "Nutch Generator: generate from " + dbDir); Configuration conf = job.getConfiguration(); if (numLists == -1) { /* for politeness create exactly one partition per fetch task */ @@ -942,8 +941,7 @@ public Path[] generate(Path dbDir, Path segments, int numLists, long topN, Path tempDir2 = new Path(dbDir, "generate-temp-" + java.util.UUID.randomUUID().toString()); - job = NutchJob.getInstance(getConf()); - job.setJobName("generate: updatedb " + dbDir); + job = Job.getInstance(getConf(), "Nutch Generator: updatedb " + dbDir); job.getConfiguration().setLong(Nutch.GENERATE_TIME_KEY, generateTime); for (Path segmpaths : generatedSegments) { Path subGenDir = new Path(segmpaths, CrawlDatum.GENERATE_DIR_NAME); @@ -1001,8 +999,7 @@ private Path partitionSegment(Path segmentsDir, Path inputDir, int numLists) LOG.info("Generator: segment: " + segment); - Job job = NutchJob.getInstance(getConf()); - job.setJobName("generate: partition " + segment); + Job job = Job.getInstance(getConf(), "Nutch Generator: partition segment " + segment); Configuration conf = job.getConfiguration(); conf.setInt("partition.url.seed", RANDOM.nextInt()); diff --git a/src/java/org/apache/nutch/crawl/Injector.java b/src/java/org/apache/nutch/crawl/Injector.java index 9bfd1b454..0d3740eb4 100644 --- a/src/java/org/apache/nutch/crawl/Injector.java +++ b/src/java/org/apache/nutch/crawl/Injector.java @@ -404,7 +404,7 @@ public void inject(Path crawlDb, Path urlDir, boolean overwrite, Path lock = CrawlDb.lock(conf, crawlDb, false); // configure job - Job job = Job.getInstance(conf, "inject " + urlDir); + Job job = Job.getInstance(conf, "Nutch Injector: " + urlDir); job.setJarByClass(Injector.class); job.setMapperClass(InjectMapper.class); job.setReducerClass(InjectReducer.class); diff --git a/src/java/org/apache/nutch/crawl/LinkDb.java b/src/java/org/apache/nutch/crawl/LinkDb.java index 3c752ab1d..2f4a0dda4 100644 --- a/src/java/org/apache/nutch/crawl/LinkDb.java +++ b/src/java/org/apache/nutch/crawl/LinkDb.java @@ -270,9 +270,8 @@ private static Job createJob(Configuration config, Path linkDb, Path newLinkDb = new Path(linkDb, Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - Job job = NutchJob.getInstance(config); + Job job = Job.getInstance(config, "Nutch LinkDb: " + linkDb); Configuration conf = job.getConfiguration(); - job.setJobName("linkdb " + linkDb); job.setInputFormatClass(SequenceFileInputFormat.class); diff --git a/src/java/org/apache/nutch/crawl/LinkDbMerger.java b/src/java/org/apache/nutch/crawl/LinkDbMerger.java index d6a41ab48..c3da2031e 100644 --- a/src/java/org/apache/nutch/crawl/LinkDbMerger.java +++ b/src/java/org/apache/nutch/crawl/LinkDbMerger.java @@ -147,8 +147,7 @@ public static Job createMergeJob(Configuration config, Path linkDb, Path newLinkDb = new Path(linkDb, "merge-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - Job job = NutchJob.getInstance(config); - job.setJobName("linkdb merge " + linkDb); + Job job = Job.getInstance(config, "Nutch LinkDbMerger: " + linkDb); Configuration conf = job.getConfiguration(); job.setInputFormatClass(SequenceFileInputFormat.class); diff --git a/src/java/org/apache/nutch/crawl/LinkDbReader.java b/src/java/org/apache/nutch/crawl/LinkDbReader.java index fa01f20bf..9ae356683 100644 --- a/src/java/org/apache/nutch/crawl/LinkDbReader.java +++ b/src/java/org/apache/nutch/crawl/LinkDbReader.java @@ -159,8 +159,7 @@ public void processDumpJob(String linkdb, String output, String regex) Path outFolder = new Path(output); - Job job = NutchJob.getInstance(getConf()); - job.setJobName("read " + linkdb); + Job job = Job.getInstance(getConf(), "Nutch LinkDbReader: " + linkdb); job.setJarByClass(LinkDbReader.class); Configuration conf = job.getConfiguration(); diff --git a/src/java/org/apache/nutch/fetcher/Fetcher.java b/src/java/org/apache/nutch/fetcher/Fetcher.java index 92aef6f10..d1774f530 100644 --- a/src/java/org/apache/nutch/fetcher/Fetcher.java +++ b/src/java/org/apache/nutch/fetcher/Fetcher.java @@ -498,7 +498,7 @@ public void fetch(Path segment, int threads) throws IOException, totalOutlinksToFollow); } - Job job = NutchJob.getInstance(getConf()); + Job job = Job.getInstance(getConf(), "Nutch Fetcher: " + segment.getName()); job.setJobName("FetchData"); Configuration conf = job.getConfiguration(); diff --git a/src/java/org/apache/nutch/hostdb/ReadHostDb.java b/src/java/org/apache/nutch/hostdb/ReadHostDb.java index 0321a8652..036b78650 100644 --- a/src/java/org/apache/nutch/hostdb/ReadHostDb.java +++ b/src/java/org/apache/nutch/hostdb/ReadHostDb.java @@ -181,8 +181,7 @@ private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); conf.set("mapreduce.output.textoutputformat.separator", "\t"); - Job job = Job.getInstance(conf); - job.setJobName("ReadHostDb"); + Job job = Job.getInstance(conf, "Nutch ReadHostDb"); job.setJarByClass(ReadHostDb.class); FileInputFormat.addInputPath(job, new Path(hostDb, "current")); diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java index 65e45c55d..5148a6be1 100644 --- a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java +++ b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java @@ -77,11 +77,10 @@ private void updateHostDb(Path hostDb, Path crawlDb, Path topHosts, stopWatch.start(); LOG.info("UpdateHostDb: starting"); - Job job = NutchJob.getInstance(getConf()); + Job job = Job.getInstance(getConf(), "Nutch UpdateHostDb"); Configuration conf = job.getConfiguration(); boolean preserveBackup = conf.getBoolean("db.preserve.backup", true); job.setJarByClass(UpdateHostDb.class); - job.setJobName("UpdateHostDb"); FileSystem fs = hostDb.getFileSystem(conf); Path old = new Path(hostDb, "old"); diff --git a/src/java/org/apache/nutch/indexer/CleaningJob.java b/src/java/org/apache/nutch/indexer/CleaningJob.java index 04b9c2efa..8334ac353 100644 --- a/src/java/org/apache/nutch/indexer/CleaningJob.java +++ b/src/java/org/apache/nutch/indexer/CleaningJob.java @@ -144,7 +144,7 @@ public void delete(String crawldb, boolean noCommit) stopWatch.start(); LOG.info("CleaningJob: starting"); - Job job = NutchJob.getInstance(getConf()); + Job job = Job.getInstance(getConf(), "Nutch CleaningJob: " + crawldb); Configuration conf = job.getConfiguration(); FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME)); @@ -157,8 +157,6 @@ public void delete(String crawldb, boolean noCommit) job.setReducerClass(DeleterReducer.class); job.setJarByClass(CleaningJob.class); - job.setJobName("CleaningJob"); - // need to expicitely allow deletions conf.setBoolean(IndexerMapReduce.INDEXER_DELETE, true); diff --git a/src/java/org/apache/nutch/indexer/IndexingJob.java b/src/java/org/apache/nutch/indexer/IndexingJob.java index d2115230c..c3ddb4ae9 100644 --- a/src/java/org/apache/nutch/indexer/IndexingJob.java +++ b/src/java/org/apache/nutch/indexer/IndexingJob.java @@ -108,7 +108,8 @@ public void index(Path crawlDb, Path linkDb, List segments, stopWatch.start(); LOG.info("Indexer: starting"); - final Job job = NutchJob.getInstance(getConf()); + final Job job = Job.getInstance(getConf(), + "Nutch IndexingJob: crawldb: " + crawlDb + " segment(s): " + segments); job.setJobName("Indexer"); Configuration conf = job.getConfiguration(); diff --git a/src/java/org/apache/nutch/parse/ParseSegment.java b/src/java/org/apache/nutch/parse/ParseSegment.java index de45c463b..1995a880e 100644 --- a/src/java/org/apache/nutch/parse/ParseSegment.java +++ b/src/java/org/apache/nutch/parse/ParseSegment.java @@ -232,8 +232,7 @@ public void parse(Path segment) throws IOException, LOG.info("ParseSegment: starting"); LOG.info("ParseSegment: segment: {}", segment); - Job job = NutchJob.getInstance(getConf()); - job.setJobName("parse " + segment); + Job job = Job.getInstance(getConf(), "Nutch ParseSegment: " + segment); Configuration conf = job.getConfiguration(); FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME)); diff --git a/src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java b/src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java index 4831d73f3..439d7438c 100644 --- a/src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java +++ b/src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java @@ -341,8 +341,7 @@ public void dumpLinks(Path webGraphDb) throws IOException, // run the inverter job Path tempInverted = new Path(webGraphDb, "inverted-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - Job inverter = NutchJob.getInstance(conf); - inverter.setJobName("LinkDumper: inverter"); + Job inverter = Job.getInstance(conf, "Nutch LinkDumper: invert " + webGraphDb); FileInputFormat.addInputPath(inverter, nodeDb); FileInputFormat.addInputPath(inverter, outlinkDb); inverter.setInputFormatClass(SequenceFileInputFormat.class); @@ -372,8 +371,7 @@ public void dumpLinks(Path webGraphDb) throws IOException, } // run the merger job - Job merger = NutchJob.getInstance(conf); - merger.setJobName("LinkDumper: merger"); + Job merger = Job.getInstance(conf, "Nutch LinkDumper: merge " + tempInverted); FileInputFormat.addInputPath(merger, tempInverted); merger.setJarByClass(Merger.class); merger.setInputFormatClass(SequenceFileInputFormat.class); diff --git a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java index c226ad130..e48f04acd 100644 --- a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java +++ b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java @@ -93,9 +93,8 @@ private int runCounter(FileSystem fs, Path webGraphDb) throws IOException, // configure the counter job Path numLinksPath = new Path(webGraphDb, NUM_NODES); Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR); - Job counter = NutchJob.getInstance(getConf()); + Job counter = Job.getInstance(getConf(), "Nutch LinkRank: counter " + webGraphDb); Configuration conf = counter.getConfiguration(); - counter.setJobName("LinkRank Counter"); FileInputFormat.addInputPath(counter, nodeDb); FileOutputFormat.setOutputPath(counter, numLinksPath); counter.setInputFormatClass(SequenceFileInputFormat.class); @@ -194,9 +193,8 @@ private void runInitializer(Path nodeDb, Path output) throws IOException, InterruptedException, ClassNotFoundException { // configure the initializer - Job initializer = NutchJob.getInstance(getConf()); + Job initializer = Job.getInstance(getConf(), "Nutch LinkRank: initializer " + nodeDb); Configuration conf = initializer.getConfiguration(); - initializer.setJobName("LinkAnalysis Initializer"); FileInputFormat.addInputPath(initializer, nodeDb); FileOutputFormat.setOutputPath(initializer, output); initializer.setJarByClass(Initializer.class); @@ -245,9 +243,9 @@ private void runInverter(Path nodeDb, Path outlinkDb, Path output) throws IOException, InterruptedException, ClassNotFoundException { // configure the inverter - Job inverter = NutchJob.getInstance(getConf()); + Job inverter = Job.getInstance(getConf(), + "Nutch Linkrank: inverter nodedb: " + nodeDb + " outlinkdb: " + outlinkDb); Configuration conf = inverter.getConfiguration(); - inverter.setJobName("LinkAnalysis Inverter"); FileInputFormat.addInputPath(inverter, nodeDb); FileInputFormat.addInputPath(inverter, outlinkDb); FileOutputFormat.setOutputPath(inverter, output); @@ -305,11 +303,10 @@ private void runAnalysis(Path nodeDb, Path inverted, Path output, int iteration, int numIterations, float rankOne) throws IOException, InterruptedException, ClassNotFoundException { - Job analyzer = NutchJob.getInstance(getConf()); + Job analyzer = Job.getInstance(getConf(), + "Nutch LinkRank: analysis iteration" + (iteration + 1) + " of " + numIterations); Configuration conf = analyzer.getConfiguration(); conf.set("link.analyze.iteration", String.valueOf(iteration + 1)); - analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1) - + " of " + numIterations); FileInputFormat.addInputPath(analyzer, nodeDb); FileInputFormat.addInputPath(analyzer, inverted); FileOutputFormat.setOutputPath(analyzer, output); diff --git a/src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java b/src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java index dfccccc19..9277df8f6 100644 --- a/src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java +++ b/src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java @@ -298,9 +298,8 @@ public void dumpNodes(Path webGraphDb, DumpType type, long topN, Path output, LOG.info("NodeDumper: starting"); Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR); - Job dumper = NutchJob.getInstance(getConf()); + Job dumper = Job.getInstance(getConf(), "Nutch NodeDumper: " + webGraphDb); Configuration conf = dumper.getConfiguration(); - dumper.setJobName("NodeDumper: " + webGraphDb); FileInputFormat.addInputPath(dumper, nodeDb); dumper.setInputFormatClass(SequenceFileInputFormat.class); diff --git a/src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java b/src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java index c10a6e37b..bcd534274 100644 --- a/src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java +++ b/src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java @@ -170,8 +170,7 @@ public void update(Path crawlDb, Path webGraphDb) throws IOException, .nextInt(Integer.MAX_VALUE))); // run the updater job outputting to the temp crawl database - Job updater = NutchJob.getInstance(conf); - updater.setJobName("Update CrawlDb from WebGraph"); + Job updater = Job.getInstance(conf, "Nutch ScoreUpdater: " + crawlDb); FileInputFormat.addInputPath(updater, crawlDbCurrent); FileInputFormat.addInputPath(updater, nodeDb); FileOutputFormat.setOutputPath(updater, newCrawlDb); diff --git a/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java b/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java index b98329d1e..25e3cf230 100644 --- a/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java +++ b/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java @@ -545,9 +545,8 @@ public void createWebGraph(Path webGraphDb, Path[] segments, Path tempOutlinkDb = new Path(outlinkDb + "-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - Job outlinkJob = NutchJob.getInstance(getConf()); + Job outlinkJob = Job.getInstance(getConf(), "Nutch WebGraph: outlinkdb " + outlinkDb); Configuration outlinkJobConf = outlinkJob.getConfiguration(); - outlinkJob.setJobName("Outlinkdb: " + outlinkDb); boolean deleteGone = outlinkJobConf.getBoolean("link.delete.gone", false); boolean preserveBackup = outlinkJobConf.getBoolean("db.preserve.backup", true); @@ -625,9 +624,8 @@ public void createWebGraph(Path webGraphDb, Path[] segments, Path tempInlinkDb = new Path(inlinkDb + "-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - Job inlinkJob = NutchJob.getInstance(getConf()); + Job inlinkJob = Job.getInstance(getConf(), "Nutch WebGraph: inlinkdb " + inlinkDb); Configuration inlinkJobConf = inlinkJob.getConfiguration(); - inlinkJob.setJobName("Inlinkdb " + inlinkDb); LOG.info("InlinkDb: adding input: " + outlinkDb); FileInputFormat.addInputPath(inlinkJob, outlinkDb); inlinkJob.setInputFormatClass(SequenceFileInputFormat.class); @@ -669,9 +667,8 @@ public void createWebGraph(Path webGraphDb, Path[] segments, Path tempNodeDb = new Path(nodeDb + "-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - Job nodeJob = NutchJob.getInstance(getConf()); + Job nodeJob = Job.getInstance(getConf(), "Nutch WebGraph: nodedb " + nodeDb); Configuration nodeJobConf = nodeJob.getConfiguration(); - nodeJob.setJobName("NodeDb " + nodeDb); LOG.info("NodeDb: adding input: " + outlinkDb); LOG.info("NodeDb: adding input: " + inlinkDb); FileInputFormat.addInputPath(nodeJob, outlinkDb); diff --git a/src/java/org/apache/nutch/segment/SegmentMerger.java b/src/java/org/apache/nutch/segment/SegmentMerger.java index c884dfedf..53bdee22e 100644 --- a/src/java/org/apache/nutch/segment/SegmentMerger.java +++ b/src/java/org/apache/nutch/segment/SegmentMerger.java @@ -625,9 +625,8 @@ public void merge(Path out, Path[] segs, boolean filter, boolean normalize, long slice) throws IOException, ClassNotFoundException, InterruptedException { String segmentName = Generator.generateSegmentName(); LOG.info("Merging {} segments to {}/{}", segs.length, out, segmentName); - Job job = NutchJob.getInstance(getConf()); + Job job = Job.getInstance(getConf(), "Nutch SegmentMerger: " + out + "/" + segmentName); Configuration conf = job.getConfiguration(); - job.setJobName("mergesegs " + out + "/" + segmentName); conf.setBoolean("segment.merger.filter", filter); conf.setBoolean("segment.merger.normalizer", normalize); conf.setLong("segment.merger.slice", slice); diff --git a/src/java/org/apache/nutch/segment/SegmentReader.java b/src/java/org/apache/nutch/segment/SegmentReader.java index ee5c266fd..bef980060 100644 --- a/src/java/org/apache/nutch/segment/SegmentReader.java +++ b/src/java/org/apache/nutch/segment/SegmentReader.java @@ -200,8 +200,7 @@ public void dump(Path segment, Path output) throws IOException, LOG.info("SegmentReader: dump segment: {}", segment); - Job job = NutchJob.getInstance(getConf()); - job.setJobName("read " + segment); + Job job = Job.getInstance(getConf(), "Nutch SegmentReader: " + segment); Configuration conf = job.getConfiguration(); if (ge) diff --git a/src/java/org/apache/nutch/tools/FreeGenerator.java b/src/java/org/apache/nutch/tools/FreeGenerator.java index e9f5c8761..9ace8f192 100644 --- a/src/java/org/apache/nutch/tools/FreeGenerator.java +++ b/src/java/org/apache/nutch/tools/FreeGenerator.java @@ -184,7 +184,7 @@ public int run(String[] args) throws Exception { stopWatch.start(); LOG.info("FreeGenerator: starting"); - Job job = NutchJob.getInstance(getConf()); + Job job = Job.getInstance(getConf(), "Nutch FreeGenerator: " + args[0]); Configuration conf = job.getConfiguration(); conf.setBoolean(FILTER_KEY, filter); conf.setBoolean(NORMALIZE_KEY, normalize); diff --git a/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java b/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java index 825e752cc..311675310 100644 --- a/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java +++ b/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java @@ -371,14 +371,11 @@ public void createSegments(Path arcFiles, Path segmentsOutDir) StopWatch stopWatch = new StopWatch(); stopWatch.start(); - if (LOG.isInfoEnabled()) { - LOG.info("ArcSegmentCreator: starting"); - LOG.info("ArcSegmentCreator: arc files dir: " + arcFiles); - } + LOG.info("ArcSegmentCreator: starting"); + LOG.info("ArcSegmentCreator: arc files dir: " + arcFiles); - Job job = NutchJob.getInstance(getConf()); + Job job = Job.getInstance(getConf(), "Nutch ArcSegmentCreator: " + arcFiles); Configuration conf = job.getConfiguration(); - job.setJobName("ArcSegmentCreator " + arcFiles); String segName = generateSegmentName(); conf.set(Nutch.SEGMENT_NAME_KEY, segName); FileInputFormat.addInputPath(job, arcFiles); diff --git a/src/java/org/apache/nutch/tools/warc/WARCExporter.java b/src/java/org/apache/nutch/tools/warc/WARCExporter.java index 6d8a38557..4e80aac5f 100644 --- a/src/java/org/apache/nutch/tools/warc/WARCExporter.java +++ b/src/java/org/apache/nutch/tools/warc/WARCExporter.java @@ -433,8 +433,7 @@ public int generateWARC(String output, List segments, stopWatch.start(); LOG.info("WARCExporter: starting"); - final Job job = NutchJob.getInstance(getConf()); - job.setJobName("warc-exporter " + output); + final Job job = Job.getInstance(getConf(), "Nutch WARCExporter: " + output); job.getConfiguration().setBoolean(ONLY_SUCCESSFUL_RESPONSES, onlySuccessfulResponses); diff --git a/src/java/org/apache/nutch/util/CrawlCompletionStats.java b/src/java/org/apache/nutch/util/CrawlCompletionStats.java index 8696d2822..e5ee5f643 100644 --- a/src/java/org/apache/nutch/util/CrawlCompletionStats.java +++ b/src/java/org/apache/nutch/util/CrawlCompletionStats.java @@ -133,12 +133,12 @@ public int run(String[] args) throws Exception { LOG.info("CrawlCompletionStats: starting"); int mode = 0; - String jobName = "CrawlCompletionStats"; + String jobName = "Nutch CrawlCompletionStats: "; if (cli.getOptionValue("mode").equals("host")) { - jobName = "Host CrawlCompletionStats"; + jobName = jobName + "Host statistics"; mode = MODE_HOST; } else if (cli.getOptionValue("mode").equals("domain")) { - jobName = "Domain CrawlCompletionStats"; + jobName = jobName + "Domain statistics"; mode = MODE_DOMAIN; } diff --git a/src/java/org/apache/nutch/util/NutchJob.java b/src/java/org/apache/nutch/util/NutchJob.java index 068c64fef..25b894550 100644 --- a/src/java/org/apache/nutch/util/NutchJob.java +++ b/src/java/org/apache/nutch/util/NutchJob.java @@ -56,10 +56,6 @@ public NutchJob(Configuration conf, String jobName) throws IOException { } } - public static Job getInstance(Configuration conf) throws IOException { - return Job.getInstance(conf); - } - /** * Clean up the file system in case of a job failure. * @param tempDir The temporary directory which needs to be diff --git a/src/java/org/apache/nutch/util/ProtocolStatusStatistics.java b/src/java/org/apache/nutch/util/ProtocolStatusStatistics.java index 0fe6c57d0..f4e8a1b91 100644 --- a/src/java/org/apache/nutch/util/ProtocolStatusStatistics.java +++ b/src/java/org/apache/nutch/util/ProtocolStatusStatistics.java @@ -89,7 +89,7 @@ public int run(String[] args) throws Exception { stopWatch.start(); LOG.info("ProtocolStatistics: starting"); - String jobName = "ProtocolStatistics"; + String jobName = "Nutch ProtocolStatusStatistics: " + inputDir; Configuration conf = getConf(); conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); diff --git a/src/java/org/apache/nutch/util/SitemapProcessor.java b/src/java/org/apache/nutch/util/SitemapProcessor.java index 66fa9b0e7..043e77f69 100644 --- a/src/java/org/apache/nutch/util/SitemapProcessor.java +++ b/src/java/org/apache/nutch/util/SitemapProcessor.java @@ -383,7 +383,7 @@ public void sitemap(Path crawldb, Path hostdb, Path sitemapUrlDir, boolean stric conf.setBoolean(SITEMAP_URL_NORMALIZING, normalize); conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); - Job job = Job.getInstance(conf, "SitemapProcessor_" + crawldb.toString()); + Job job = Job.getInstance(conf, "Nutch SitemapProcessor: " + crawldb.toString()); job.setJarByClass(SitemapProcessor.class); // add crawlDb, sitemap url directory and hostDb to input paths @@ -431,23 +431,21 @@ public void sitemap(Path crawldb, Path hostdb, Path sitemapUrlDir, boolean stric FSUtils.replace(fs, current, tempCrawlDb, true); LockUtil.removeLockFile(fs, lock); - if (LOG.isInfoEnabled()) { - long filteredRecords = job.getCounters().findCounter("Sitemap", "filtered_records").getValue(); - long fromHostname = job.getCounters().findCounter("Sitemap", "sitemaps_from_hostname").getValue(); - long fromSeeds = job.getCounters().findCounter("Sitemap", "sitemap_seeds").getValue(); - long failedFetches = job.getCounters().findCounter("Sitemap", "failed_fetches").getValue(); - long newSitemapEntries = job.getCounters().findCounter("Sitemap", "new_sitemap_entries").getValue(); - - LOG.info("SitemapProcessor: Total records rejected by filters: {}", filteredRecords); - LOG.info("SitemapProcessor: Total sitemaps from host name: {}", fromHostname); - LOG.info("SitemapProcessor: Total sitemaps from seed urls: {}", fromSeeds); - LOG.info("SitemapProcessor: Total failed sitemap fetches: {}", failedFetches); - LOG.info("SitemapProcessor: Total new sitemap entries added: {}", newSitemapEntries); - - stopWatch.stop(); - LOG.info("SitemapProcessor: finished, elapsed: {} ms", stopWatch.getTime( - TimeUnit.MILLISECONDS)); - } + long filteredRecords = job.getCounters().findCounter("Sitemap", "filtered_records").getValue(); + long fromHostname = job.getCounters().findCounter("Sitemap", "sitemaps_from_hostname").getValue(); + long fromSeeds = job.getCounters().findCounter("Sitemap", "sitemap_seeds").getValue(); + long failedFetches = job.getCounters().findCounter("Sitemap", "failed_fetches").getValue(); + long newSitemapEntries = job.getCounters().findCounter("Sitemap", "new_sitemap_entries").getValue(); + + LOG.info("SitemapProcessor: Total records rejected by filters: {}", filteredRecords); + LOG.info("SitemapProcessor: Total sitemaps from host name: {}", fromHostname); + LOG.info("SitemapProcessor: Total sitemaps from seed urls: {}", fromSeeds); + LOG.info("SitemapProcessor: Total failed sitemap fetches: {}", failedFetches); + LOG.info("SitemapProcessor: Total new sitemap entries added: {}", newSitemapEntries); + + stopWatch.stop(); + LOG.info("SitemapProcessor: finished, elapsed: {} ms", stopWatch.getTime( + TimeUnit.MILLISECONDS)); } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error("SitemapProcessor_" + crawldb.toString(), e); NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs); diff --git a/src/java/org/apache/nutch/util/domain/DomainStatistics.java b/src/java/org/apache/nutch/util/domain/DomainStatistics.java index f77b72bc5..1843c424d 100644 --- a/src/java/org/apache/nutch/util/domain/DomainStatistics.java +++ b/src/java/org/apache/nutch/util/domain/DomainStatistics.java @@ -97,18 +97,18 @@ public int run(String[] args) throws Exception { LOG.info("DomainStatistics: starting"); int mode = 0; - String jobName = "DomainStatistics"; + String jobName = "Nutch DomainStatistics: "; if (args[2].equals("host")) { - jobName = "Host statistics"; + jobName = jobName + "Host statistics"; mode = MODE_HOST; } else if (args[2].equals("domain")) { - jobName = "Domain statistics"; + jobName = jobName + "Domain statistics"; mode = MODE_DOMAIN; } else if (args[2].equals("suffix")) { - jobName = "Suffix statistics"; + jobName = jobName + "Suffix statistics"; mode = MODE_SUFFIX; } else if (args[2].equals("tld")) { - jobName = "TLD statistics"; + jobName = jobName + "Top Level Directory statistics"; mode = MODE_TLD; } diff --git a/src/test/org/apache/nutch/crawl/TestCrawlDbFilter.java b/src/test/org/apache/nutch/crawl/TestCrawlDbFilter.java index 82fefaf16..812d4a6a8 100644 --- a/src/test/org/apache/nutch/crawl/TestCrawlDbFilter.java +++ b/src/test/org/apache/nutch/crawl/TestCrawlDbFilter.java @@ -31,7 +31,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.nutch.crawl.CrawlDBTestUtil.URLCrawlDatum; -import org.apache.nutch.util.NutchJob; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -94,7 +93,7 @@ public void testUrl404Purging() throws Exception { conf.setBoolean(CrawlDbFilter.URL_NORMALIZING, true); conf.setBoolean(CrawlDbFilter.URL_FILTERING, false); conf.setInt("urlnormalizer.loop.count", 2); - Job job = NutchJob.getInstance(conf); + Job job = Job.getInstance(conf); job.setJobName("Test CrawlDbFilter"); Path current = new Path(dbDir, "current"); if (FileSystem.get(conf).exists(current)) { diff --git a/src/test/org/apache/nutch/plugin/TestPluginSystem.java b/src/test/org/apache/nutch/plugin/TestPluginSystem.java index dba7c6606..7c1362aa5 100644 --- a/src/test/org/apache/nutch/plugin/TestPluginSystem.java +++ b/src/test/org/apache/nutch/plugin/TestPluginSystem.java @@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.nutch.util.NutchConfiguration; -import org.apache.nutch.util.NutchJob; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -102,7 +101,7 @@ public void testLoadPlugins() { public void testRepositoryCache() throws IOException { Configuration config = NutchConfiguration.create(); PluginRepository repo = PluginRepository.get(config); - Job job = NutchJob.getInstance(config); + Job job = Job.getInstance(config); config = job.getConfiguration(); PluginRepository repo1 = PluginRepository.get(config); Assert.assertTrue(repo == repo1); @@ -111,7 +110,7 @@ public void testRepositoryCache() throws IOException { config.addResource("nutch-default.xml"); config.addResource("nutch-site.xml"); repo = PluginRepository.get(config); - job = NutchJob.getInstance(config); + job = Job.getInstance(config); config = job.getConfiguration(); repo1 = PluginRepository.get(config); Assert.assertTrue(repo1 != repo); From d8e66ce87328ce4bb14b0da9516faf8a9f63f818 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Mon, 6 Nov 2023 14:16:46 +0000 Subject: [PATCH 3/7] [NUTCH-3025^Curlfilter-fast to filter based on the length of the URL Signed-off-by: Julien Nioche --- src/plugin/urlfilter-fast/README.md | 5 ++++ .../nutch/urlfilter/fast/FastURLFilter.java | 26 +++++++++++++++++++ .../urlfilter/fast/TestFastURLFilter.java | 22 ++++++++++++++++ 3 files changed, 53 insertions(+) diff --git a/src/plugin/urlfilter-fast/README.md b/src/plugin/urlfilter-fast/README.md index 2e5860575..e6205fc16 100644 --- a/src/plugin/urlfilter-fast/README.md +++ b/src/plugin/urlfilter-fast/README.md @@ -73,3 +73,8 @@ the end of the line. The rules file is defined via the property `urlfilter.fast.file`, the default name is `fast-urlfilter.txt`. + +In addition to this, the filter checks that the length of the path element of the URL and its query +done not exceed the values set in the properties `urlfilter.fast.url.path.max.length` and +`urlfilter.fast.url.query.max.length` if set. + diff --git a/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java b/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java index ffcd0138a..6761d5ef7 100644 --- a/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java +++ b/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java @@ -97,9 +97,17 @@ public class FastURLFilter implements URLFilter { private Configuration conf; public static final String URLFILTER_FAST_FILE = "urlfilter.fast.file"; + public static final String URLFILTER_FAST_PATH_MAX_LENGTH = "urlfilter.fast.url.path.max.length"; + public static final String URLFILTER_FAST_QUERY_MAX_LENGTH = "urlfilter.fast.url.query.max.length"; + private Multimap hostRules = LinkedHashMultimap.create(); private Multimap domainRules = LinkedHashMultimap.create(); + /** Max allowed size of the path of a URL **/ + private int maxLengthPath = -1; + /** Max allowed size of the query of a URL **/ + private int maxLengthQuery = -1; + private static final Pattern CATCH_ALL_RULE = Pattern .compile("^\\s*DenyPath(?:Query)?\\s+\\.[*?]\\s*$"); @@ -112,6 +120,8 @@ public FastURLFilter() {} @Override public void setConf(Configuration conf) { this.conf = conf; + maxLengthPath = conf.getInt(URLFILTER_FAST_PATH_MAX_LENGTH, -1); + maxLengthQuery = conf.getInt(URLFILTER_FAST_QUERY_MAX_LENGTH, -1); try { reloadRules(); } catch (Exception e) { @@ -137,6 +147,22 @@ public String filter(String url) { e.getMessage()); return null; } + + final String path = u.getPath(); + if (maxLengthPath != -1 && path.length() > maxLengthPath) + { + LOG.debug("Rejected {} as path length {} is greater than {}", url, + path.length(), maxLengthPath); + return null; + } + + final String query = u.getQuery(); + if (maxLengthQuery != -1 && query != null && query.length() > maxLengthQuery) + { + LOG.debug("Rejected {} as query length {} is greater than {}", url, + query.length(), maxLengthQuery); + return null; + } String hostname = u.getHost(); diff --git a/src/plugin/urlfilter-fast/src/test/org/apache/nutch/urlfilter/fast/TestFastURLFilter.java b/src/plugin/urlfilter-fast/src/test/org/apache/nutch/urlfilter/fast/TestFastURLFilter.java index 8e01d8d3c..0b31a5ad1 100644 --- a/src/plugin/urlfilter-fast/src/test/org/apache/nutch/urlfilter/fast/TestFastURLFilter.java +++ b/src/plugin/urlfilter-fast/src/test/org/apache/nutch/urlfilter/fast/TestFastURLFilter.java @@ -16,10 +16,12 @@ */ package org.apache.nutch.urlfilter.fast; +import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.Reader; +import org.apache.hadoop.conf.Configuration; import org.apache.nutch.net.URLFilter; import org.apache.nutch.urlfilter.api.RegexURLFilterBaseTest; import org.junit.Assert; @@ -53,4 +55,24 @@ public void benchmark() { bench(800, "fast-urlfilter-benchmark.txt", "Benchmarks.urls"); } + public void lengthQueryAndPath() throws FileNotFoundException { + URLFilter filter = getURLFilter(new FileReader(SAMPLES + SEPARATOR + "fast-urlfilter-test.txt")); + Configuration conf = new Configuration(); + conf.setInt(FastURLFilter.URLFILTER_FAST_PATH_MAX_LENGTH, 50); + conf.setInt(FastURLFilter.URLFILTER_FAST_QUERY_MAX_LENGTH, 50); + filter.setConf(conf); + + StringBuilder url = new StringBuilder("http://nutch.apache.org/"); + for (int i = 0; i < 50; i++) { + url.append(i); + } + Assert.assertEquals(null, filter.filter(url.toString())); + + url = new StringBuilder("http://nutch.apache.org/path?"); + for (int i = 0; i < 50; i++) { + url.append(i); + } + + Assert.assertEquals(null, filter.filter(url.toString())); + } } From f88b9a116d6be5eea738d99af65406bdd96fd6d0 Mon Sep 17 00:00:00 2001 From: Tim Allison Date: Mon, 6 Nov 2023 12:10:43 -0500 Subject: [PATCH 4/7] NUTCH-3019 -- update Tika (#797) Update to Tika 2.9.1 --- ivy/ivy.xml | 14 ++++++++------ src/plugin/language-identifier/ivy.xml | 2 +- src/plugin/language-identifier/plugin.xml | 2 +- src/plugin/parse-tika/ivy.xml | 2 +- src/plugin/parse-tika/plugin.xml | 2 +- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/ivy/ivy.xml b/ivy/ivy.xml index e5ae3882f..054cbcc1a 100644 --- a/ivy/ivy.xml +++ b/ivy/ivy.xml @@ -36,19 +36,21 @@ - - - - + + + + - + @@ -70,7 +72,7 @@ - + diff --git a/src/plugin/language-identifier/ivy.xml b/src/plugin/language-identifier/ivy.xml index f64b97055..e22284c03 100644 --- a/src/plugin/language-identifier/ivy.xml +++ b/src/plugin/language-identifier/ivy.xml @@ -35,7 +35,7 @@ - + diff --git a/src/plugin/language-identifier/plugin.xml b/src/plugin/language-identifier/plugin.xml index dab1a52f3..94929bdbf 100644 --- a/src/plugin/language-identifier/plugin.xml +++ b/src/plugin/language-identifier/plugin.xml @@ -26,7 +26,7 @@ - + diff --git a/src/plugin/parse-tika/ivy.xml b/src/plugin/parse-tika/ivy.xml index b89e812e1..8008e3244 100644 --- a/src/plugin/parse-tika/ivy.xml +++ b/src/plugin/parse-tika/ivy.xml @@ -35,7 +35,7 @@ - + diff --git a/src/plugin/parse-tika/plugin.xml b/src/plugin/parse-tika/plugin.xml index dd4fe7fde..04afb9fac 100644 --- a/src/plugin/parse-tika/plugin.xml +++ b/src/plugin/parse-tika/plugin.xml @@ -25,7 +25,7 @@ - + From 90849124d757fb0417ea90576e88b1f55da616f1 Mon Sep 17 00:00:00 2001 From: Tim Allison Date: Mon, 6 Nov 2023 15:07:10 -0500 Subject: [PATCH 5/7] NUTCH-3020 -- ParseSegment should check for okhttp's truncation flag (#794) * ParseSegment should check for okhttp's truncated flag --- .../org/apache/nutch/parse/ParseSegment.java | 11 +++ .../apache/nutch/parse/TestParseSegment.java | 81 +++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 src/test/org/apache/nutch/parse/TestParseSegment.java diff --git a/src/java/org/apache/nutch/parse/ParseSegment.java b/src/java/org/apache/nutch/parse/ParseSegment.java index 1995a880e..e9f041a5f 100644 --- a/src/java/org/apache/nutch/parse/ParseSegment.java +++ b/src/java/org/apache/nutch/parse/ParseSegment.java @@ -180,6 +180,17 @@ public static boolean isTruncated(Content content) { if (metadata == null) return false; + //check for okhttp or other protocol's truncated flag + //if the flag is there, no matter the value, trust it. + if (metadata.get(Response.TRUNCATED_CONTENT) != null) { + if ("true".equals(metadata.get(Response.TRUNCATED_CONTENT))) { + LOG.info(content.getUrl() + " skipped. Protocol metadata indicates truncated content, " + + "actualSize= " + content.getContent().length); + return true; + } + return false; + } + String lengthStr = metadata.get(Response.CONTENT_LENGTH); if (lengthStr != null) lengthStr = lengthStr.trim(); diff --git a/src/test/org/apache/nutch/parse/TestParseSegment.java b/src/test/org/apache/nutch/parse/TestParseSegment.java new file mode 100644 index 000000000..dd7f4f920 --- /dev/null +++ b/src/test/org/apache/nutch/parse/TestParseSegment.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.parse; + +import static junit.framework.TestCase.assertFalse; +import static junit.framework.TestCase.assertTrue; + +import java.nio.charset.StandardCharsets; + +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.net.protocols.Response; +import org.apache.nutch.protocol.Content; +import org.junit.Test; + +public class TestParseSegment { + private static byte[] BYTES = "the quick brown fox".getBytes(StandardCharsets.UTF_8); + + @Test + public void testMetadataFlag() throws Exception { + + Content content = new Content(); + Metadata metadata = new Metadata(); + metadata.set(Response.TRUNCATED_CONTENT, "true"); + content.setMetadata(metadata); + content.setContent(BYTES); + assertTrue(ParseSegment.isTruncated(content)); + + metadata.set(Response.TRUNCATED_CONTENT, "false"); + assertFalse(ParseSegment.isTruncated(content)); + + //test that truncated_content does override length field + metadata = new Metadata(); + metadata.set(Response.TRUNCATED_CONTENT, "false"); + metadata.set(Response.CONTENT_LENGTH, Integer.toString(BYTES.length + 10)); + assertFalse(ParseSegment.isTruncated(content)); + + //test that truncated_content does override length field + metadata = new Metadata(); + metadata.set(Response.TRUNCATED_CONTENT, "true"); + metadata.set(Response.CONTENT_LENGTH, Integer.toString(BYTES.length)); + assertFalse(ParseSegment.isTruncated(content)); + + } + + @Test + public void testLength() throws Exception { + Content content = new Content(); + Metadata metadata = new Metadata(); + metadata.set(Response.CONTENT_LENGTH, Integer.toString(BYTES.length)); + content.setMetadata(metadata); + content.setContent(BYTES); + assertFalse(ParseSegment.isTruncated(content)); + + metadata.set(Response.CONTENT_LENGTH, Integer.toString(BYTES.length * 2)); + assertTrue(ParseSegment.isTruncated(content)); + } + + @Test + public void testNoLengthField() { + //test return false if there is no "Length" header field + Content content = new Content(); + Metadata metadata = new Metadata(); + content.setMetadata(metadata); + content.setContent(BYTES); + assertFalse(ParseSegment.isTruncated(content)); + } +} From d764e4c1668794ba80ec1ad4d00d2a69bc80aa9c Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Tue, 7 Nov 2023 10:43:37 +0000 Subject: [PATCH 6/7] Added filtering on whole string + documented config in nutch-default + fixed tests Signed-off-by: Julien Nioche --- conf/nutch-default.xml | 24 +++++++++++++++++++ src/plugin/urlfilter-fast/README.md | 3 ++- .../nutch/urlfilter/fast/FastURLFilter.java | 24 +++++++++++++++++++ .../urlfilter/fast/TestFastURLFilter.java | 24 +++++++++++++++---- 4 files changed, 69 insertions(+), 6 deletions(-) diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml index d8bf76486..8d97dca9d 100644 --- a/conf/nutch-default.xml +++ b/conf/nutch-default.xml @@ -1876,6 +1876,30 @@ CAUTION: Set the parser.timeout to -1 or a bigger value than 30, when using this used by urlfilter-fast (FastURLFilter) plugin. + + urlfilter.fast.url.max.length + -1 + Filters URLs based on their overall length. + The default value of -1 means that it is deactivated. + + + + + urlfilter.fast.url.path.max.length + -1 + Filters URLs based on the length of their path element. + The default value of -1 means that it is deactivated. + + + + + urlfilter.fast.url.query.max.length + -1 + Filters URLs based on the length of their query element. + The default value of -1 means that it is deactivated. + + + urlfilter.order diff --git a/src/plugin/urlfilter-fast/README.md b/src/plugin/urlfilter-fast/README.md index e6205fc16..b4b0dfcd9 100644 --- a/src/plugin/urlfilter-fast/README.md +++ b/src/plugin/urlfilter-fast/README.md @@ -76,5 +76,6 @@ the default name is `fast-urlfilter.txt`. In addition to this, the filter checks that the length of the path element of the URL and its query done not exceed the values set in the properties `urlfilter.fast.url.path.max.length` and -`urlfilter.fast.url.query.max.length` if set. +`urlfilter.fast.url.query.max.length` if set. The overall length of the URL can also be used for +filtering through the config `urlfilter.fast.url.max.length`. diff --git a/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java b/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java index 6761d5ef7..ab905c19d 100644 --- a/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java +++ b/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java @@ -97,6 +97,7 @@ public class FastURLFilter implements URLFilter { private Configuration conf; public static final String URLFILTER_FAST_FILE = "urlfilter.fast.file"; + public static final String URLFILTER_FAST_MAX_LENGTH = "urlfilter.fast.url.max.length"; public static final String URLFILTER_FAST_PATH_MAX_LENGTH = "urlfilter.fast.url.path.max.length"; public static final String URLFILTER_FAST_QUERY_MAX_LENGTH = "urlfilter.fast.url.query.max.length"; @@ -107,21 +108,34 @@ public class FastURLFilter implements URLFilter { private int maxLengthPath = -1; /** Max allowed size of the query of a URL **/ private int maxLengthQuery = -1; + /** Max allowed size for the whole URL **/ + private int maxLength = -1; private static final Pattern CATCH_ALL_RULE = Pattern .compile("^\\s*DenyPath(?:Query)?\\s+\\.[*?]\\s*$"); public FastURLFilter() {} + /** Used by the tests so that the rules file doesn't have to be in the jar **/ FastURLFilter(Reader rules) throws IOException, PatternSyntaxException { reloadRules(rules); } + + /** Used by the tests so that the rules file doesn't have to be in the jar AND + * we can set the conf for the length-based filtering **/ + FastURLFilter(Reader rules, Configuration conf) throws IOException, PatternSyntaxException { + maxLengthPath = conf.getInt(URLFILTER_FAST_PATH_MAX_LENGTH, -1); + maxLengthQuery = conf.getInt(URLFILTER_FAST_QUERY_MAX_LENGTH, -1); + maxLength = conf.getInt(URLFILTER_FAST_MAX_LENGTH, -1); + reloadRules(rules); + } @Override public void setConf(Configuration conf) { this.conf = conf; maxLengthPath = conf.getInt(URLFILTER_FAST_PATH_MAX_LENGTH, -1); maxLengthQuery = conf.getInt(URLFILTER_FAST_QUERY_MAX_LENGTH, -1); + maxLength = conf.getInt(URLFILTER_FAST_MAX_LENGTH, -1); try { reloadRules(); } catch (Exception e) { @@ -138,6 +152,12 @@ public Configuration getConf() { @Override public String filter(String url) { + if (maxLength != -1 && url.length() > maxLength) { + LOG.debug("Rejected {} because URL length ({}) greater than limit {}", url, + url.length(), maxLength); + return null; + } + URL u; try { @@ -209,6 +229,10 @@ public void reloadRules() throws IOException { String fileRules = conf.get(URLFILTER_FAST_FILE); try (Reader reader = conf.getConfResourceAsReader(fileRules)) { reloadRules(reader); + } catch (Exception e) { + String message = "Couldn't load the rules from "+fileRules; + LOG.error(message); + throw new IOException(message); } } diff --git a/src/plugin/urlfilter-fast/src/test/org/apache/nutch/urlfilter/fast/TestFastURLFilter.java b/src/plugin/urlfilter-fast/src/test/org/apache/nutch/urlfilter/fast/TestFastURLFilter.java index 0b31a5ad1..75b37250e 100644 --- a/src/plugin/urlfilter-fast/src/test/org/apache/nutch/urlfilter/fast/TestFastURLFilter.java +++ b/src/plugin/urlfilter-fast/src/test/org/apache/nutch/urlfilter/fast/TestFastURLFilter.java @@ -16,10 +16,10 @@ */ package org.apache.nutch.urlfilter.fast; -import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.Reader; +import java.io.StringReader; import org.apache.hadoop.conf.Configuration; import org.apache.nutch.net.URLFilter; @@ -27,7 +27,6 @@ import org.junit.Assert; import org.junit.Test; - public class TestFastURLFilter extends RegexURLFilterBaseTest { @Override @@ -55,12 +54,13 @@ public void benchmark() { bench(800, "fast-urlfilter-benchmark.txt", "Benchmarks.urls"); } - public void lengthQueryAndPath() throws FileNotFoundException { - URLFilter filter = getURLFilter(new FileReader(SAMPLES + SEPARATOR + "fast-urlfilter-test.txt")); + @Test + public void lengthQueryAndPath() throws Exception { Configuration conf = new Configuration(); conf.setInt(FastURLFilter.URLFILTER_FAST_PATH_MAX_LENGTH, 50); conf.setInt(FastURLFilter.URLFILTER_FAST_QUERY_MAX_LENGTH, 50); - filter.setConf(conf); + // not interested in testing rules + URLFilter filter = new FastURLFilter(new StringReader(""), conf); StringBuilder url = new StringBuilder("http://nutch.apache.org/"); for (int i = 0; i < 50; i++) { @@ -75,4 +75,18 @@ public void lengthQueryAndPath() throws FileNotFoundException { Assert.assertEquals(null, filter.filter(url.toString())); } + + @Test + public void overalLengthTest() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(FastURLFilter.URLFILTER_FAST_MAX_LENGTH, 100); + // not interested in testing rules + URLFilter filter = new FastURLFilter(new StringReader(""), conf); + + StringBuilder url = new StringBuilder("http://nutch.apache.org/"); + for (int i = 0; i < 500; i++) { + url.append(i); + } + Assert.assertEquals(null, filter.filter(url.toString())); + } } From ac383fc5125b6c114a23ef996558ead57e873970 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Wed, 8 Nov 2023 12:24:24 +0100 Subject: [PATCH 7/7] [NUTCH-3017] Allow fast-urlfilter to load from HDFS/S3 and support gzipped input - use Hadoop-provided compression codecs - update description of property urlfilter.fast.file --- conf/nutch-default.xml | 10 ++++++++-- .../apache/nutch/urlfilter/fast/FastURLFilter.java | 14 ++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml index d8bf76486..b20afdfe3 100644 --- a/conf/nutch-default.xml +++ b/conf/nutch-default.xml @@ -1872,8 +1872,14 @@ CAUTION: Set the parser.timeout to -1 or a bigger value than 30, when using this urlfilter.fast.file fast-urlfilter.txt - Name of file on CLASSPATH containing regular expressions - used by urlfilter-fast (FastURLFilter) plugin. + Name of file containing rules and regular expressions + used by urlfilter-fast (FastURLFilter) plugin. If the filename + includes a scheme (for example, hdfs://) it is loaded using the + Hadoop FileSystem implementation supporting that scheme. If the + filename does not contain a scheme, the file is loaded from + CLASSPATH. If indicated by file extension (.gz, .bzip2, .zst), + the file is decompressed while reading using Hadoop-provided + compression codecs. diff --git a/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java b/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java index 79ad7b6ca..bb4a11b7c 100644 --- a/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java +++ b/src/plugin/urlfilter-fast/src/java/org/apache/nutch/urlfilter/fast/FastURLFilter.java @@ -21,6 +21,8 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.nutch.net.URLFilter; import org.slf4j.Logger; @@ -35,7 +37,6 @@ import java.net.URL; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; -import java.util.zip.GZIPInputStream; /** * Filters URLs based on a file of regular expressions using host/domains @@ -120,7 +121,7 @@ public void setConf(Configuration conf) { try { reloadRules(); } catch (Exception e) { - LOG.error(e.getMessage()); + LOG.error("Failed to load rules: {}", e.getMessage() ); throw new RuntimeException(e.getMessage(), e); } } @@ -193,13 +194,14 @@ public void reloadRules() throws IOException { if (fileRulesPath.toUri().getScheme() != null) { FileSystem fs = fileRulesPath.getFileSystem(conf); is = fs.open(fileRulesPath); - } - else { + } else { is = conf.getConfResourceAsInputStream(fileRules); } - if (fileRules.endsWith(".gz")) { - is = new GZIPInputStream(is); + CompressionCodec codec = new CompressionCodecFactory(conf) + .getCodec(fileRulesPath); + if (codec != null) { + is = codec.createInputStream(is); } reloadRules(new InputStreamReader(is));