Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the equivalent to scala.util.Try in pyspark?

I've got a lousy HTTPD access_log and just want to skip the "lousy" lines.

In scala this is straightforward:

import scala.util.Try

val log = sc.textFile("access_log")

log.map(_.split(' ')).map(a => Try(a(8))).filter(_.isSuccess).map(_.get).map(code => (code,1)).reduceByKey(_ + _).collect()

For python I've got the following solution by explicitly defining a function in contrast using the "lambda" notation:

log = sc.textFile("access_log")

def wrapException(a):
    try:
        return a[8]
    except:
        return 'error'

log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect()

Is there a better way doing this (e.g. like in Scala) in pyspark?

Thanks a lot!

like image 209
Romeo Kienzler Avatar asked Oct 28 '15 05:10

Romeo Kienzler


2 Answers

Better is a subjective term but there are a few approaches you can try.

  • The simplest thing you can do in this particular case is to avoid exceptions whatsoever. All you need is a flatMap and some slicing:

    log.flatMap(lambda s : s.split(' ')[8:9])
    

    As you can see it means no need for an exception handling or subsequent filter.

  • Previous idea can be extended with a simple wrapper

    def seq_try(f, *args, **kwargs):
        try:
            return [f(*args, **kwargs)]
        except:
            return []
    

    and example usage

    from operator import div # FYI operator provides getitem as well.
    
    rdd = sc.parallelize([1, 2, 0, 3, 0, 5, "foo"])
    
    rdd.flatMap(lambda x: seq_try(div, 1., x)).collect()
    ## [1.0, 0.5, 0.3333333333333333, 0.2]
    
  • finally more OO approach:

    import inspect as _inspect
    
    class _Try(object): pass    
    
    class Failure(_Try):
        def __init__(self, e):
            if Exception not in _inspect.getmro(e.__class__):
                msg = "Invalid type for Failure: {0}"
                raise TypeError(msg.format(e.__class__))
            self._e = e
            self.isSuccess =  False
            self.isFailure = True
    
        def get(self): raise self._e
    
        def __repr__(self):
            return "Failure({0})".format(repr(self._e))
    
    class Success(_Try):
        def __init__(self, v):
            self._v = v
            self.isSuccess = True
            self.isFailure = False
    
        def get(self): return self._v
    
        def __repr__(self):
            return "Success({0})".format(repr(self._v))
    
    def Try(f, *args, **kwargs):
        try:
            return Success(f(*args, **kwargs))
        except Exception as e:
            return Failure(e)
    

    and example usage:

    tries = rdd.map(lambda x: Try(div, 1.0, x))
    tries.collect()
    ## [Success(1.0),
    ##  Success(0.5),
    ##  Failure(ZeroDivisionError('float division by zero',)),
    ##  Success(0.3333333333333333),
    ##  Failure(ZeroDivisionError('float division by zero',)),
    ##  Success(0.2),
    ##  Failure(TypeError("unsupported operand type(s) for /: 'float' and 'str'",))]
    
    tries.filter(lambda x: x.isSuccess).map(lambda x: x.get()).collect()
    ## [1.0, 0.5, 0.3333333333333333, 0.2]
    

    You can even use pattern matching with multipledispatch

    from multipledispatch import dispatch
    from operator import getitem
    
    @dispatch(Success)
    def check(x): return "Another great success"
    
    @dispatch(Failure)
    def check(x): return "What a failure"
    
    a_list = [1, 2, 3]
    
    check(Try(getitem, a_list, 1))
    ## 'Another great success'
    
    check(Try(getitem, a_list, 10)) 
    ## 'What a failure'
    

    If you like this approach I've pushed a little bit more complete implementation to GitHub and pypi.

like image 87
zero323 Avatar answered Sep 28 '22 00:09

zero323


First, let me generate some random data to start working with.

import random
number_of_rows = int(1e6)
line_error = "error line"
text = []
for i in range(number_of_rows):
    choice = random.choice([1,2,3,4])
    if choice == 1:
        line = line_error
    elif choice == 2:
        line = "1 2 3 4 5 6 7 8 9_1"
    elif choice == 3:
        line = "1 2 3 4 5 6 7 8 9_2"
    elif choice == 4:
        line = "1 2 3 4 5 6 7 8 9_3"
    text.append(line)

Now I have a string text looks like

  1 2 3 4 5 6 7 8 9_2
  error line
  1 2 3 4 5 6 7 8 9_3
  1 2 3 4 5 6 7 8 9_2
  1 2 3 4 5 6 7 8 9_3
  1 2 3 4 5 6 7 8 9_1
  error line
  1 2 3 4 5 6 7 8 9_2
  ....

Your solution:

def wrapException(a):
    try:
        return a[8]
    except:
        return 'error'

log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect()

#[('9_3', 250885), ('9_1', 249307), ('9_2', 249772)]

Here is my solution:

from operator import add
def myfunction(l):
    try:
        return (l.split(' ')[8],1)
    except: 
        return ('MYERROR', 1) 
log.map(myfunction).reduceByKey(add).collect()
#[('9_3', 250885), ('9_1', 249307), ('MYERROR', 250036), ('9_2', 249772)]

Comment:

(1) I highly recommend also calculating the lines with "error" because it won't add too much overhead, and also can be used for sanity check, for example, all the counts should add up to the total number of rows in the log, if you filter out those lines, you have no idea those are truly bad lines or something went wrong in your coding logic.

(2) I will try to package all the line level operations in one function to avoid chaining of map, filter functions, so it is more readable.

(3) From performance perspective, I generated a sample of 1M records and my code finished in 3 seconds and yours in 2 seconds, it is not a fair comparasion since the data is so small and my cluster is pretty beefy, I would recommend you generate a bigger file (1e12?) and do a benchmark on yours.

like image 30
B.Mr.W. Avatar answered Sep 28 '22 01:09

B.Mr.W.