Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
579 views
in Technique[技术] by (71.8m points)

optimization - Pyspark socket timeout exception after application running for a while

I am using pyspark to estimate parameters for a logistic regression model. I use spark to calculate the likelihood and gradients and then use scipy's minimize function for optimization (L-BFGS-B).

I use yarn-client mode to run my application. My application could start to run without any problem. However, after a while it reports the following error:

Traceback (most recent call last):
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module>
    res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B')
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM
    options={'disp': False})
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize
    callback=callback, **options)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb
    f, g = func_and_grad(x)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad
    f = fun(x, *args)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper
    return function(*(wrapper_args + args))
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj
    return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum()
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold
    vals = self.mapPartitions(func).collect()
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket
    for item in serializer.load_stream(rf):
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream
16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
    yield self._read_with_length(stream)
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length
    length = read_int(stream)
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int
    length = stream.read(4)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read
    data = self._sock.recv(left)
socket.timeout: timed out

I also found python broken pipe error when I set spark log level to "ALL".

I am using Spark 1.6.2 and Java 1.8.0_91. Any idea what's going on?

--Update--

I found this is related to the optimization routine I used in my program.

What I was doing is estimating a statistical model with maximum likelihood method using EM algorithm (as iterative algorithm). During each iteration, I need to update the parameters by solving a minimization problem. Spark is responsible for calculating my likelihood and gradient, which are then passed to Scipy's minimize routine where I use L-BFGS-B method. It seems that something in this routine that crashes my Spark job. But I have no idea which part of the routine is responsible for this issue.

Another observation is that, while using the same sample and same program, I changed the number of partitions. When the number of partition is small my program could finish without any problem. However, when the number of partitions becomes large, the program starts to crash.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I had similar problem. I had an iteration, and sometimes execution took so long it timed out. Increasing spark.executor.heartbeatInterval seemed to solve the problem. I increased it to 3600s to ensure I don't run into timeouts again and everything is working fine since then.

From: http://spark.apache.org/docs/latest/configuration.html :

spark.executor.heartbeatInterval 10s Interval between each executor's heartbeats to the driver. Heartbeats let the driver know that the executor is still alive and update it with metrics for in-progress tasks.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...