scala - Building Inverted Index exceed the Java Heap Size -


this might special case after pounding on head while thought stackoverflow community.

i building inverted index large data set (one day worth of data large system). building of inverted index executed map reduce job on hadoop. inverted index build of scala. structure of inverted index follows: {key:"new", productid:[1,2,3,4,5,...]} these written in avro files.

during process run java heap size issue. think reason terms "new" showed above contain large number of productid(s). have a, rough idea issue take place in scala code:

  def toindexedrecord(ids: list[long], token: string): indexrecord = {     val javalist = ids.map(l => l: java.lang.long).asjava //need convert scala long java long     new indexrecord(token, javalist)   } 

and how use method (it used in many locations same code structure , login use)

  val titles = textpipedump.map(vf => (vf.itemid, normalizer.customnormalizer(vf.title + " " + vf.subtitle).trim))     .flatmap {     case (id, title) =>       val ss = title.split("\\s+")       ss.map(word => (word, list(id)))   }     .filter(f => f._2.nonempty)     .group     .sum     .map {     case (token, ids) =>       toindexedrecord(ids, token)   }  

textpipedump scalding multipletextline field object

case class multipletextlinefiles(p : string*) extends fixedpathsource(p:_*) textlinescheme 

i have case class split , grab fields want text line , thats object ss

here stack trace:

exception in thread "ipc client (47) connection /127.0.0.1:55977 job_201306241658_232590" java.lang.outofmemoryerror: java heap space     @ org.apache.hadoop.io.ioutils.closestream(ioutils.java:226)     @ org.apache.hadoop.ipc.client$connection.close(client.java:903)     @ org.apache.hadoop.ipc.client$connection.run(client.java:800) 28079664 [main] error cascading.flow.stream.traphandler - caught throwable, no trap available, rethrowing cascading.pipe.operatorexception: [writablesequencefile(h...][com.twitter.scalding.groupbuilder$$anonfun$1.apply(groupbuilder.scala:189)] operator every failed executing operation: mrmaggregator[decl:'value']     @ cascading.flow.stream.aggregatoreverystage.receive(aggregatoreverystage.java:136)     @ cascading.flow.stream.aggregatoreverystage.receive(aggregatoreverystage.java:39)     @ cascading.flow.stream.openreducingduct.receive(openreducingduct.java:49)     @ cascading.flow.stream.openreducingduct.receive(openreducingduct.java:28)     @ cascading.flow.hadoop.stream.hadoopgroupgate.run(hadoopgroupgate.java:90)     @ cascading.flow.hadoop.flowreducer.reduce(flowreducer.java:133)     @ org.apache.hadoop.mapred.reducetask.runoldreducer(reducetask.java:520)     @ org.apache.hadoop.mapred.reducetask.run(reducetask.java:421)     @ org.apache.hadoop.mapred.child$4.run(child.java:255)     @ java.security.accesscontroller.doprivileged(native method)     @ javax.security.auth.subject.doas(subject.java:396)     @ org.apache.hadoop.security.usergroupinformation.doas(usergroupinformation.java:1178)     @ org.apache.hadoop.mapred.child.main(child.java:249) caused by: java.lang.outofmemoryerror: java heap space     @ scala.collection.mutable.listbuffer.$plus$eq(listbuffer.scala:168)     @ scala.collection.mutable.listbuffer.$plus$eq(listbuffer.scala:45)     @ scala.collection.generic.growable$$anonfun$$plus$plus$eq$1.apply(growable.scala:48)     @ scala.collection.generic.growable$$anonfun$$plus$plus$eq$1.apply(growable.scala:48)     @ scala.collection.immutable.list.foreach(list.scala:318)     @ scala.collection.generic.growable$class.$plus$plus$eq(growable.scala:48)     @ scala.collection.mutable.listbuffer.$plus$plus$eq(listbuffer.scala:176)     @ scala.collection.immutable.list.$colon$colon$colon(list.scala:127)     @ scala.collection.immutable.list.$plus$plus(list.scala:193)     @ com.twitter.algebird.listmonoid.plus(monoid.scala:86)     @ com.twitter.algebird.listmonoid.plus(monoid.scala:84)     @ com.twitter.scalding.keyedlist$$anonfun$sum$1.apply(typedpipe.scala:264)     @ com.twitter.scalding.mrmaggregator.aggregate(operations.scala:279)     @ cascading.flow.stream.aggregatoreverystage.receive(aggregatoreverystage.java:128)     ... 12 more 

when execute map reduce job small data set don't error. means data increase number of items/product_ids index words new or old etc bigger cause heap size overflow.

so, question how can avoid java heap size overflow , accomplish task.


Comments

Popular posts from this blog

c++ - Creating new partition disk winapi -

Android Prevent Bluetooth Pairing Dialog -

VBA function to include CDATA -