Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Priority Queue Serialization Error #3

Open
LaurelStan opened this issue May 31, 2018 · 31 comments
Open

Priority Queue Serialization Error #3

LaurelStan opened this issue May 31, 2018 · 31 comments

Comments

@LaurelStan
Copy link

In ModelTest.scala it is documented that:
"// This is unable to be run currently due to some PriorityQueue serialization issues"
This is the error I am running into. Are there any workarounds for this? I am currently unable to print any results without a solution to this problem.

@howie
Copy link

howie commented Jun 7, 2018

Hi
I also have this problem.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 946.0 in stage 16.0 (TID 1971) had a not serializable result: scala.collection.mutable.PriorityQueue$$anon$3
Serialization stack:
- object not serializable (class: scala.collection.mutable.PriorityQueue$$anon$3, value: non-empty iterator)

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:921)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:919)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:919)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:351)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)

@namitk
Copy link
Contributor

namitk commented Jun 8, 2018

Apologies for the delay in responding. Could you please provide a minimal snippet of code that can reproduce the problem?

@LaurelStan
Copy link
Author

It occurred when I tried to use your ModelTest.scala code.

@joerenner
Copy link

I'm getting the same issue using the scala 2.10 version. @LaurelStan are you using scala 2.10? I think it may be related to https://issues.scala-lang.org/browse/SI-7568

@howie
Copy link

howie commented Jun 13, 2018

@joerenner I use this library on 2.11 (spark 2.3.0) still got this problem.

@howie
Copy link

howie commented Jun 13, 2018

Hi @namitk

I use your library in java environment, the following is sample code

 w2v.printSchema();
 RDD<Tuple2<Object, Vector>> map = w2v.javaRDD()
                                             .map(row -> new Tuple2<>(row.getAs("index"),
                                                                      (org.apache.spark.ml.linalg.Vector) row.getAs(
                                                                              "vectors")))
                                             .rdd();
System.out.println("toRDD:" + map.count());
LSHNearestNeighborSearchModel<CosineSignRandomProjectionModel> model = new CosineSignRandomProjectionNNS(
                "signrp").setNumHashes(300)
                         .setSignatureLength(15)
                         .setJoinParallelism(5000)
                         .setBucketLimit(1000)
                         .setShouldSampleBuckets(true)
                         .setNumOutputPartitions(100)
                         .createModel(vectorSize);

// get 100 nearest neighbors for each item in items from within itself
// RDD[(Long, Long, Double)]
RDD<Tuple3<Object, Object, Object>> selfAllNearestNeighbors = model.getSelfAllNearestNeighbors(map, 6);
selfAllNearestNeighbors.toJavaRDD().foreach(tu->{
            System.out.println(tu._1() + " " + tu._2() + " " + tu._3());

+--------------------------------+-----+----------------------------------------------------------------------------------------------------+
| rid|index| vectors|
+--------------------------------+-----+----------------------------------------------------------------------------------------------------+
|78c75b8dd86f96e084143b51ffe9c0ee| 0|[0.0267901596636572,-0.05762127241791859,-0.023966949549868367,-0.04586133027169126,-0.0069206741...|
|d16432bde1375a92667013ce6b36d0b2| 1|[0.06872010593640229,-0.07356948241944554,-0.04449785742076077,-0.004567217407902295,-0.036333482...|
|b15726a4ab9c742689ce956f126cdadb| 2|[0.04203528417482075,-0.10665688237069007,-0.06613768664555757,0.008198245994699949,0.00161943160...|
+--------------------------------+-----+----------------------------------------------------------------------------------------------------+

root
|-- rid: string (nullable = false)
|-- index: long (nullable = false)
|-- vectors: vector (nullable = false)

toRDD:3

[2018-06-13 09:53:33,873][ERROR] Executor : Exception in task 2325.0 in stage 16.0 (TID 3350)
java.io.NotSerializableException: scala.collection.mutable.PriorityQueue$$anon$3
Serialization stack:
- object not serializable (class: scala.collection.mutable.PriorityQueue$$anon$3, value: non-empty iterator)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)
at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:699)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-06-13 09:53:33,912][ERROR] TaskSetManager : Task 2325.0 in stage 16.0 (TID 3350) had a not serializable result: scala.collection.mutable.PriorityQueue$$anon$3
Serialization stack:
- object not serializable (class: scala.collection.mutable.PriorityQueue$$anon$3, value: non-empty iterator); not retrying

@namitk
Copy link
Contributor

namitk commented Jun 13, 2018

The scala ticket that @joerenner pointed indices this issue has been fixed in vScala 2.11.0-M7. That said, I came across BoundedPriorityQueue in the spark code base which I hadn't used due to it being private to org.apache.spark. However, I noticed that they use the Java priority queue, probably to avoid the same issue. I'll look into whether I can do something similar.

That said, at least on my side, I only get this issue while running in the local mode of spark. The library seems to run fine for me in spark cluster mode. Don't fully understand why, I'll try to dig further.

@nicola007b
Copy link

nicola007b commented Oct 3, 2018

Hi @namitk, is there any update on this issue?
I tried to reimplement TopNQueue using java.util.PriorityQueue but still got Serialization issues. You can see the gist:
https://gist.github.com/nicola007b/454bc77c435cff65e5cdd73ced316e1c

18/10/04 07:48:59 INFO DAGScheduler: ShuffleMapStage 3 (zipPartitions at LSHNearestNeighborSearchModel.scala:265) failed in 0.768 s due to Job aborted due to stage failure: Task 0.0 in stage 3.0 (TID 4) had a not serializable result: scala.collection.LinearSeqLike$$anon$1
Serialization stack:
	- object not serializable (class: scala.collection.LinearSeqLike$$anon$1, value: non-empty iterator)

@oscaroboto
Copy link

Hi @nicola007b I tried the same thing as you and swapped out scala PriorityQueue with the java.util.PriorityQueue, but I then ran into the same serialization error.

Could this new issue be caused by the mutable.Map or mutable.ArrayBuffer in LSHNearestNeighborSearchModel.scala? Considering that the scala PriorityQueue is in fact a mutable.PriorityQueue and part of the same mutable library

@nicola007b
Copy link

nicola007b commented Nov 29, 2018

@oscaroboto thanks for following up on this. I believe the issue is in
https://github.com/linkedin/scanns/blob/master/scanns/src/main/scala/com/linkedin/nn/model/LSHNearestNeighborSearchModel.scala#L55 which takes as input an Iterator[Array[mutable.ArrayBuffer[ItemId]]] which is not serializable.
@namitk what are your thoughts on this?

@Itfly
Copy link

Itfly commented Dec 3, 2018

Also got this issue. After replacing all the Iterator in NearestNeighborIterator class (except the outer Iterator of the return type) with Seq/List the exception is gone.

@MarkTickner
Copy link

@Itfly would you mind posting exactly what you changed please?

@oscaroboto
Copy link

@Itfly I used IndexedSeq instead of Seq/List. Seq are a special cases of iterable and got the same error. List worked too but thought IndexedSeq may give better performance.

@oscaroboto
Copy link

no need to modify TopNQueue

@nicola007b
Copy link

@oscaroboto @Itfly would be great if you could open a PR with the fix

@oscaroboto
Copy link

@nicola007b doing some tests and will PR soon

@Itfly
Copy link

Itfly commented Dec 6, 2018

@MarkTickner I changed NearestNeighborIterator' to

private[model] class NearestNeighborIterator(buckets: Seq[Seq[Seq[ItemId]]],
                                itemVectors: mutable.Map[ItemId, Vector],
                                numNearestNeighbors: Int) extends Iterator[(ItemId, List[ItemIdDistancePair])]

and added an index to support the original buckets iterator's HashNext and Next methods.

Guessing it's because Iterator do not support serialization, so all the members in NearestNeighborIterator should not use Iterator object including the return object of the override next method which maybe called in the serialization/deserialization process.

@Itfly
Copy link

Itfly commented Dec 6, 2018

@oscaroboto thanks for your advice. I'm a scala/spark beginner.

@zhihuawon
Copy link

@namitk @oscaroboto @Itfly do you solve this issue?

@Itfly
Copy link

Itfly commented Dec 10, 2018

@zhihuawon Here's my modification, hope this can help you too.

private[model] class NearestNeighborIterator(buckets: IndexedSeq[Array[mutable.ArrayBuffer[ItemId]]],
                                itemVectors: mutable.Map[ItemId, Vector],
                                numNearestNeighbors: Int) extends Iterator[(ItemId, IndexedSeq[ItemIdDistancePair])]
    with Serializable {

    private val bucketsSize = buckets.size
    private var bucketsIndex = 0

    // this will be the next element that the iterator returns on a call to next()
    private var nextResult: Option[(ItemId, IndexedSeq[ItemIdDistancePair])] = None

    // this is the current tuple in the bucketsIt iterator that is being scanned
    private var currentTuple = if (hasNextBucket()) Some(nextBucket()) else None

    // this is the index in the first array of currentTuple which is being scanned
    private var currentIndex = 0

    private def hasNextBucket(): Boolean ={
      bucketsIndex < bucketsSize
    }

    private def nextBucket(): Array[mutable.ArrayBuffer[ItemId]]={
      val result = buckets(bucketsIndex)
      bucketsIndex += 1
      result
    }

    private def populateNext(): Unit = {
      var done = false
      while (currentTuple.isDefined && !done) {
        currentTuple match {
          case Some(x) =>
            while (currentIndex < x(0).size && !done) {
              val queue = new TopNQueue(numNearestNeighbors)
              val currentId = x(0)(currentIndex)
              x(1).filter(_ != currentId)
                .map(c => (c, distance.compute(itemVectors(c), itemVectors(currentId))))
                .foreach(queue.enqueue(_))
              if (queue.nonEmpty()) {
                nextResult = Some((currentId, queue.values()))
                //nextResult = Some((x(0)(currentIndex), queue.iterator()))
                done = true
              }
              currentIndex += 1
            }
            if (currentIndex == x(0).size) {
              currentIndex = 0
              currentTuple = if (hasNextBucket()) Some(nextBucket()) else None
            }
          case _ =>
        }
      }
      if (currentTuple.isEmpty && !done) {
        nextResult = None
      }
    }

    populateNext()

    override def hasNext: Boolean = nextResult.isDefined

    override def next(): (ItemId, IndexedSeq[ItemIdDistancePair]) = {
      if (hasNext) {
        val ret = nextResult.get
        populateNext()
        return (ret._1, ret._2)
      }
      null
    }
  }

Then use hashBuckets.values.toIndexedSeq to instantiate NearestNeighborIterator,
also need to add a method values in TopNQueue:

  def values(): IndexedSeq[ItemIdDistancePair] = priorityQ.dequeueAll.reverse

@zhihuawon
Copy link

@Itfly thanks a lot

@oscaroboto
Copy link

Sorry for the delay everyone. I finally got a chance to make the PR.

@tatsuya-takahashi
Copy link

Any Updates? its not work for me.

@danielmelemed
Copy link

I am also facing the same issue.

@pmaditya
Copy link

I am also facing the same issue. Has the issue fixed ?

@LanceNorskog
Copy link

If this is a dead project, could you mark it as such instead of wasting people's time?

@oscaroboto
Copy link

This appears to be a dead project 😔

@namitk
Copy link
Contributor

namitk commented Mar 29, 2019

Hey folks, I'm really sorry I haven't been on top of things and didn't notice @oscaroboto PR #6. I switched jobs in Oct 2018 and it seems this isn't being maintained by anyone at LinkedIn anymore. After leaving, I am no longer working in an environment where I use or have access to a spark cluster so haven't been keeping up with the project. I have commented on the PR however and will be happy to review if one of you can help fix this issue.

Apologies :(

@oscaroboto
Copy link

Hi @namitk! Thank you for you reply! I just made a PR

@namitk
Copy link
Contributor

namitk commented Mar 31, 2019

I wanted to ask you guys where you get this error. As I have mentioned in the thread above, I do not see this error anywhere except in the local test. The same code being executed in a spark shell in local mode as well as cluster mode runs successfully. Is that not what you guys observe? If not, could you please tell me what version of spark, scanns and your environment (java / scala version etc) you are running where you see this error in a non-local-test environment?

I used it as follows (also works with scala 2.10 versions of spark and scanns)
../spark-2.1.0-bin-hadoop2.7/bin/spark-shell --jars build/scanns_2.11/libs/scanns_2.11-1.0.1.jar
image

@chen7572
Copy link

chen7572 commented Apr 1, 2019

@namitk For me, I got this error using scanns on databricks, with scala version 2.11. Then I made all the changes based on the pull request by @oscaroboto, and scanns works now.

ZhaoHonghong added a commit to adRise/scanns that referenced this issue Dec 14, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests