HAKATEMIA
04Building Burp Suite extensions with the legacy Extender API

Developing an extension with Extender API: automatic session management part 1

Medium30MIN

Plugin registration

In this module, we will solve a real-world problem by automatically handling situations where the application being tested uses very short login times. This typically becomes a problem when trying to use any kind of automation, perform scans, or use tools that take longer to execute than the time the user stays logged in to the application.

PY
1from burp import IBurpExtender
2from burp import IHttpListener
3
4class BurpExtender(IBurpExtender, IHttpListener):
5  def registerExtenderCallbacks(self, callbacks):
6    self.callbacks = callbacks
7    self.helpers = callbacks.getHelpers()
8    callbacks.setExtensionName("Handle Authentication Plugin")
9    callbacks.registerHttpListener(self)
10    self.session_token = None

Let's start with the familiar plugin registration. Here we're just adding a new object called self.session_token. This will be used later, but in practice is just a storage location for the current session cookie.

Login programming

Next, we need a function that can programmatically perform the login present in the application and update the new session cookie obtained from it to the previously defined session_token variable.

PY
1```python
2def makeAuth(self):
3    try:
4        headers = [
5            "POST /login HTTP/1.1",
6            "Host: 127.0.0.1:5000",
7            "Content-Length: 17",
8            "Content-Type: application/x-www-form-urlencoded"
9        ]
10        body = "username=username"
11        
12        auth_message = self.helpers.buildHttpMessage(headers, body)
13        host = "127.0.0.1"
14        port = 5000
15        use_https = False
16        
17        resp = self.callbacks.makeHttpRequest(host, port, use_https, auth_message)
18        resp_info = self.helpers.analyzeResponse(resp)
19        cookie = resp_info.getHeaders()[7]
20        self.session_token = cookie.split("session=")[-1].split(";")[0]
21    except Exception as e:
22        print(e)
TEXT
1
2In the *makeAuth* function above, we practically construct an HTTP request to log in to the service, then send this HTTP request using the [*makeHttpRequest*](https://portswigger.net/burp/extender/api/burp/iburpextendercallbacks.html#makeHttpRequest-java.lang.String-int-boolean-byte:A-) function. We read the incoming response and extract a new login cookie that the service created for us as a result of the login.
3
4The *makeHttpRequest* function takes the following parameters:
5
6- *host* - the domain name to which the request is sent
7- *port* - the port to which the request is sent
8- *use\_https* - using an HTTPS connection
9- *auth\_message* - the HTTP request that was just constructed
10
11### Updating cookies in headers
12
13Next, we create another function that updates the current login cookie in the given header list.
14
15```python
16```python
17def updateCookies(self, headers):
18    new_headers = []
19    cookie_seen = False
20    for header in headers:
21        if "Cookie:" in header:
22            new_headers.append("Cookie: session={}".format(self.session_token))
23            cookie_seen = True
24        else:
25            new_headers.append(header)
26    if not cookie_seen:
27        new_headers.append("Cookie: session={}".format(self.session_token))
28    return new_headers
TEXT
1
2This can, of course, be done within the *processHttpMessage* function, but in this example, we have separated it into its own function. The function returns an updated list of headers with the current cookie.
3
4### Building the processing of HTTP request and response
5
6Finally, we write the necessary logic for processing HTTP messages in the *processHttpMessage* function:
7
8```python
9```python
10def processHttpMessage(self, tool_flag, is_request, message_info):
11    if is_request:
12        request = message_info.getRequest()
13        request_info = self.helpers.analyzeRequest(request)
14        # If the target is correct
15        if "127.0.0.1:5000" in request_info.getHeaders()[1]:
16            # if the session cookie is saved
17            if self.session_token:
18                # update the current session cookie in the request
19                new_headers = self.updateCookies(request_info.getHeaders())
20                body = request[request_info.getBodyOffset():]
21                new_message = self.helpers.buildHttpMessage(new_headers, body)
22                message_info.setRequest(new_message)
TEXT
1
2for HTTP responses:
3
4```python
5```python
6else:
7    response = message_info.getResponse()
8    response_info = self.helpers.analyzeResponse(response)
9    # get the status code and if it is 302, and
10    # the redirect is happening to path /, we know
11    # that the session has expired because the response takes us
12    # to the login page
13    
14    if int(response_info.getStatusCode()) == 302 and "Location: /" in response_info.getHeaders():
15        # Update the login cookie by performing
16        # the login again
17        self.makeAuth()
18        
19        # Repeat the previously failed HTTP request, where
20        # the server requested a login again
21        req = message_info.getRequest()
22        req_i = self.helpers.analyzeRequest(req)
23        new_headers = self.updateCookies(req_i.getHeaders())
24        body = req[req_i.getBodyOffset():]
25        new_message = self.helpers.buildHttpMessage(new_headers, body)
26        host = "127.0.0.1"
27        port = 5000
28        use_https = False
29        success_resp = self.callbacks.makeHttpRequest(host, port, use_https, new_message)
30        # Set the response of the repeated HTTP request
31        # in place of the response requiring re-login
32        message_info.setResponse(success_resp)
33    return
TEXT
1
2So in practice, our code reacts to situations where the server responds in a certain way that indicates the login has expired. We then programmatically perform the login ourselves, save the new cookie obtained from it, and use this in future calls. The only complicating factor is that when we notice that the login has expired, we have to repeat the original HTTP request to which the server initially responded so that the session is no longer valid. Once we have completed this with the new cookie, we then set this response in place of the response requiring re-login. This way, no tool is aware that a re-login was performed on the fly during execution, and thus nothing breaks.
3
4### Final code
5
6The final code will look something like this:
7
8```python
9```python
10from burp import IBurpExtender
11from burp import IHttpListener
12
13class BurpExtender(IBurpExtender, IHttpListener):
14    def registerExtenderCallbacks(self, callbacks):
15        self.callbacks = callbacks
16        self.helpers = callbacks.getHelpers()
17        callbacks.setExtensionName("Handle Authentication Plugin")
18        callbacks.registerHttpListener(self)
19        self.session_token = None
20
21    def makeAuth(self):
22        try:
23            headers = [
24                "POST /login HTTP/1.1",
25                "Host: 127.0.0.1:5000",
26                "Content-Length: 17",
27                "Content-Type: application/x-www-form-urlencoded"
28            ]
29            body = "username=username"
30
31            auth_message = self.helpers.buildHttpMessage(headers, body)
32            host = "127.0.0.1"
33            port = 5000
34            use_https = False
35
36            resp = self.callbacks.makeHttpRequest(host, port, use_https, auth_message)
37            resp_info = self.helpers.analyzeResponse(resp)
38            cookie = resp_info.getHeaders()[7]
39            self.session_token = cookie.split("session=")[-1].split(";")[0]
40        except Exception as e:
41            print(e)
42
43    def updateCookies(self, headers):
44        new_headers = []
45        cookie_seen = False
46        for header in headers:
47            if "Cookie:" in header:
48                new_headers.append("Cookie: session={}".format(self.session_token))
49                cookie_seen = True
50            else:
51                new_headers.append(header)
52        if not cookie_seen:
53            new_headers.append("Cookie: session={}".format(self.session_token))
54        return new_headers
55
56    def processHttpMessage(self, tool_flag, is_request, message_info):
57        if is_request:
58            request = message_info.getRequest()
59            request_info = self.helpers.analyzeRequest(request)
60
61            if "127.0.0.1:5000" in request_info.getHeaders()[1]:
62                if self.session_token:
63                    new_headers = self.updateCookies(request_info.getHeaders())
64                    body = request[request_info.getBodyOffset():]
65                    new_message = self.helpers.buildHttpMessage(new_headers, body)
66                    message_info.setRequest(new_message)
67
68        else:
69            response = message_info.getResponse()
70            response_info = self.helpers.analyzeResponse(response)
71            if int(response_info.getStatusCode()) == 302 and "Location: /" in response_info.getHeaders():
72                # Session has expired
73                self.makeAuth()  # Update session token
74
75                # Replay the failed request with the new session token
76                req = message_info.getRequest()
77                req_i = self.helpers.analyzeRequest(req)
78                new_headers = self.updateCookies(req_i.getHeaders())
79                body = req[req_i.getBodyOffset():]
80                new_message = self.helpers.buildHttpMessage(new_headers, body)
81                host = "127.0.0.1"
82                port = 5000
83                use_https = False
84                success_resp = self.callbacks.makeHttpRequest(host, port, use_https, new_message)
85                message_info.setResponse(success_resp)
86        return
1 / 2
Hakatemia Pro

Learn to hack — start here

Hundreds of interactive courses, virtual labs and CTF challenges in your browser. Start a free trial — no card required.