Skip to content

Commit 793e18b

Browse files
committed
GEOMESA-2299 Fixing Bigtable SpatialRDDProvider (#1974)
Signed-off-by: Emilio Lahr-Vivaz <[email protected]>
1 parent 77765c9 commit 793e18b

File tree

2 files changed

+31
-18
lines changed

2 files changed

+31
-18
lines changed

geomesa-bigtable/geomesa-bigtable-spark/src/main/scala/org/locationtech/geomesa/bigtable/spark/BigtableSparkRDDProvider.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ import org.apache.spark.SparkContext
1919
import org.geotools.data.{DataStoreFinder, Query}
2020
import org.geotools.filter.text.ecql.ECQL
2121
import org.locationtech.geomesa.bigtable.data.BigtableDataStoreFactory
22+
import org.locationtech.geomesa.filter.factory.FastFilterFactory
2223
import org.locationtech.geomesa.hbase.data.{EmptyPlan, HBaseDataStore}
23-
import org.locationtech.geomesa.hbase.index.HBaseFeatureIndex
24+
import org.locationtech.geomesa.hbase.index.{HBaseFeatureIndex, HBaseIndexAdapter}
2425
import org.locationtech.geomesa.hbase.jobs.HBaseGeoMesaRecordReader
2526
import org.locationtech.geomesa.jobs.GeoMesaConfigurator
2627
import org.locationtech.geomesa.spark.SpatialRDD
@@ -92,11 +93,11 @@ class GeoMesaBigtableInputFormat extends InputFormat[Text, SimpleFeature] {
9293
var delegate: BigtableInputFormat = _
9394

9495
var sft: SimpleFeatureType = _
95-
var table: HBaseFeatureIndex = _
96+
var table: HBaseIndexAdapter = _
9697

97-
private def init(conf: Configuration) = if (sft == null) {
98+
private def init(conf: Configuration): Unit = if (sft == null) {
9899
sft = GeoMesaConfigurator.getSchema(conf)
99-
table = HBaseFeatureIndex.index(GeoMesaConfigurator.getIndexIn(conf))
100+
table = HBaseFeatureIndex.index(GeoMesaConfigurator.getIndexIn(conf)).asInstanceOf[HBaseIndexAdapter]
100101
delegate = new BigtableInputFormat(TableName.valueOf(GeoMesaConfigurator.getTable(conf)))
101102
delegate.setConf(conf)
102103
// see TableMapReduceUtil.java
@@ -117,11 +118,10 @@ class GeoMesaBigtableInputFormat extends InputFormat[Text, SimpleFeature] {
117118
context: TaskAttemptContext): RecordReader[Text, SimpleFeature] = {
118119
init(context.getConfiguration)
119120
val rr = delegate.createRecordReader(split, context)
120-
val transformSchema = GeoMesaConfigurator.getTransformSchema(context.getConfiguration)
121-
val q = GeoMesaConfigurator.getFilter(context.getConfiguration).map { f => ECQL.toFilter(f) }
122-
new HBaseGeoMesaRecordReader(sft, table, rr, q, transformSchema)
121+
val transform = GeoMesaConfigurator.getTransformSchema(context.getConfiguration)
122+
val ecql = GeoMesaConfigurator.getFilter(context.getConfiguration).map(FastFilterFactory.toFilter(sft, _))
123+
new HBaseGeoMesaRecordReader(table, sft, ecql, transform, rr, false)
123124
}
124-
125125
}
126126

127127
object BigtableInputFormat {
@@ -133,7 +133,7 @@ class BigtableInputFormat(val name: TableName) extends BigtableInputFormatBase w
133133
setName(name)
134134

135135
/** The configuration. */
136-
private var conf: Configuration = null
136+
private var conf: Configuration = _
137137

138138

139139
/**
@@ -161,4 +161,3 @@ class BigtableInputFormat(val name: TableName) extends BigtableInputFormatBase w
161161
setScans(s)
162162
}
163163
}
164-

geomesa-hbase/geomesa-hbase-jobs/src/main/scala/org/locationtech/geomesa/hbase/jobs/GeoMesaHBaseInputFormat.scala

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,12 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable
1616
import org.apache.hadoop.hbase.mapreduce.{MultiTableInputFormat, TableInputFormat}
1717
import org.apache.hadoop.io.Text
1818
import org.apache.hadoop.mapreduce._
19-
import org.geotools.filter.identity.FeatureIdImpl
20-
import org.geotools.process.vector.TransformProcess
19+
import org.locationtech.geomesa.filter.factory.FastFilterFactory
2120
import org.locationtech.geomesa.hbase.data.HBaseConnectionPool
2221
import org.locationtech.geomesa.hbase.index.{HBaseFeatureIndex, HBaseIndexAdapter}
2322
import org.locationtech.geomesa.jobs.GeoMesaConfigurator
2423
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
25-
26-
import scala.util.control.NonFatal
24+
import org.opengis.filter.Filter
2725

2826
/**
2927
* Input format that allows processing of simple features from GeoMesa based on a CQL query
@@ -59,18 +57,23 @@ class GeoMesaHBaseInputFormat extends InputFormat[Text, SimpleFeature] with Lazy
5957
context: TaskAttemptContext): RecordReader[Text, SimpleFeature] = {
6058
init(context.getConfiguration)
6159
val rr = delegate.createRecordReader(split, context)
60+
val ecql = GeoMesaConfigurator.getFilter(context.getConfiguration).map(FastFilterFactory.toFilter(sft, _))
6261
val transform = GeoMesaConfigurator.getTransformSchema(context.getConfiguration)
63-
// transforms are pushed down in HBase
64-
new HBaseGeoMesaRecordReader(table, sft, transform, rr)
62+
// TODO GEOMESA-2300 support local filtering
63+
new HBaseGeoMesaRecordReader(table, sft, ecql, transform, rr, true)
6564
}
6665
}
6766

6867
class HBaseGeoMesaRecordReader(table: HBaseIndexAdapter,
6968
sft: SimpleFeatureType,
69+
ecql: Option[Filter],
7070
transform: Option[SimpleFeatureType],
71-
reader: RecordReader[ImmutableBytesWritable, Result])
71+
reader: RecordReader[ImmutableBytesWritable, Result],
72+
remoteFiltering: Boolean)
7273
extends RecordReader[Text, SimpleFeature] with LazyLogging {
7374

75+
import scala.collection.JavaConverters._
76+
7477
private val results = new Iterator[Result] {
7578

7679
private var current: Result = _
@@ -93,7 +96,18 @@ class HBaseGeoMesaRecordReader(table: HBaseIndexAdapter,
9396
}
9497
}
9598

96-
private val features = table.resultsToFeatures(sft, transform.getOrElse(sft))(results)
99+
private val features =
100+
if (remoteFiltering) {
101+
// transforms and filter are pushed down, so we don't have to deal with them here
102+
table.resultsToFeatures(sft, transform.getOrElse(sft))(results)
103+
} else {
104+
// TODO GEOMESA-2300 this doesn't handle anything beyond simple attribute projection
105+
val transforms = transform.map { tsft =>
106+
(tsft.getAttributeDescriptors.asScala.map(d => s"${d.getLocalName}=${d.getLocalName}").mkString(";"), tsft)
107+
}
108+
table.resultsToFeatures(sft, ecql, transforms)(results)
109+
}
110+
97111
private var staged: SimpleFeature = _
98112

99113
override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = reader.initialize(split, context)

0 commit comments

Comments
 (0)