]>
code.delx.au - proxy/blob - test_proxy.py
14 return s
.getsockname()[1]
16 SOCKS_PORT
= get_free_port()
17 ECHO_PORT
= get_free_port()
18 ECHO_PORT_B
= struct
.pack(">H", ECHO_PORT
)
21 class SocketHelper(object):
22 def init_socket(self
):
23 self
.sock
= socket
.socket(socket
.AF_INET
)
24 self
.sock
.connect(("localhost", SOCKS_PORT
))
26 def init_ipv6_socket(self
):
27 self
.sock
= socket
.socket(socket
.AF_INET6
)
28 self
.sock
.connect(("localhost", SOCKS_PORT
))
30 def destroy_socket(self
):
34 l
= self
.sock
.send(msg
)
35 self
.assertEqual(len(msg
), l
)
37 def send_proxy_length(self
, x
):
38 self
.send(("%s" % x
).zfill(10).encode("ascii") + b
"\n")
40 def recv(self
, expected_length
):
41 result
= self
.sock
.recv(16384)
42 self
.assertEqual(expected_length
, len(result
), str(result
))
47 result
= self
.sock
.recv(1)
48 self
.assertEqual(0, len(result
), str(result
))
49 except ConnectionResetError
:
52 def assertAuthSuccess(self
):
54 self
.assertEqual(b
"\x05\x00", result
)
56 def assertAuthFail(self
):
58 self
.assertEqual(b
"\x05\xff", result
)
60 def assertRequestSuccess(self
):
61 self
.assertRequestResponse(0)
63 def assertRequestFail(self
, reply
):
64 self
.assertRequestResponse(reply
)
67 def assertRequestResponse(self
, reply
):
68 reply
= struct
.pack(">B", reply
)
69 expected
= b
"\x05" + reply
+ b
"\x00\x01\x00\x00\x00\x00\x00\x00"
70 result
= self
.recv(10)
71 self
.assertEqual(expected
, result
)
73 class TestAuthNegotiation(SocketHelper
, unittest
.TestCase
):
74 def run(self
, result
=None):
76 unittest
.TestCase
.run(self
, result
)
84 def test_one_method_success(self
):
85 self
.send(b
"\x05\x01\x00")
86 self
.assertAuthSuccess()
88 def test_two_methods_success_first(self
):
89 self
.send(b
"\x05\x02\x00\x80")
90 self
.assertAuthSuccess()
92 def test_two_methods_success_second(self
):
93 self
.send(b
"\x05\x02\x80\x00")
94 self
.assertAuthSuccess()
96 def test_no_methods_fail(self
):
97 self
.send(b
"\x05\x00")
100 def test_no_matching_methods_fail(self
):
101 self
.send(b
"\x05\x01\x80")
102 self
.assertAuthFail()
104 def test_invalid_version_fail(self
):
105 self
.send(b
"\x04\x01\x00")
109 class TestRequestNegotiation(SocketHelper
, unittest
.TestCase
):
110 def run(self
, result
=None):
112 unittest
.TestCase
.run(self
, result
)
116 self
.send(b
"\x05\x01\x00")
117 self
.assertAuthSuccess()
120 self
.destroy_socket()
122 def test_invalid_version(self
):
123 self
.send(b
"\x04\x01\x00\x01\x7f\x00\x00\x01\x00\01")
124 self
.assertRequestFail(1)
126 def test_invalid_command(self
):
127 self
.send(b
"\x05\x02\x00\x01\x7f\x00\x00\x01\x00\x01")
128 self
.assertRequestFail(7)
130 def test_invalid_reserved_section(self
):
131 self
.send(b
"\x05\x01\x01\x01\x7f\x00\x00\x01\x00\x01")
132 self
.assertRequestFail(1)
134 def test_invalid_address_type(self
):
135 self
.send(b
"\x05\x01\x00\x09\x7f\x00\x00\x01\x00\x01")
136 self
.assertRequestFail(8)
138 def test_ipv4_success(self
):
139 self
.send(b
"\x05\x01\x00\x01\x7f\x00\x00\x01" + ECHO_PORT_B
)
140 self
.assertRequestSuccess()
142 def test_ipv4_bad_port(self
):
143 self
.send(b
"\x05\x01\x00\x01\x7f\x00\x00\x01\xff\xff")
144 self
.assertRequestFail(4)
146 def test_ipv4_bad_host(self
):
147 self
.send(b
"\x05\x01\x00\x01\x7f\x00\x00\x00\xff\xff")
148 self
.assertRequestFail(4)
150 def test_dns_success(self
):
151 self
.send(b
"\x05\x01\x00\x03\x09localhost" + ECHO_PORT_B
)
152 self
.assertRequestSuccess()
154 def test_dns_bad_port(self
):
155 self
.send(b
"\x05\x01\x00\x03\x09localhost\xff\xff")
156 self
.assertRequestFail(4)
158 def test_dns_bad_host(self
):
159 self
.send(b
"\x05\x01\x00\x03\x09f.invalid" + ECHO_PORT_B
)
160 self
.assertRequestFail(4)
162 def test_dns_invalid_host(self
):
163 self
.send(b
"\x05\x01\x00\x03\x00" + ECHO_PORT_B
)
166 def test_ipv6_success(self
):
167 self
.send(b
"\x05\x01\x00\x04" + (b
"\x00"*15) + b
"\x01" + ECHO_PORT_B
)
168 self
.assertRequestSuccess()
170 def test_ipv6_bad_port(self
):
171 self
.send(b
"\x05\x01\x00\x04" + (b
"\x00"*15) + b
"\x01" + b
"\xff\xff")
172 self
.assertRequestFail(4)
174 def test_ipv6_bad_host(self
):
175 self
.send(b
"\x05\x01\x00\x04" + b
"\xfe\x80" + (b
"\x00"*13) + b
"\x01" + ECHO_PORT_B
)
176 self
.assertRequestFail(4)
179 class ProxyPacketHelper(object):
180 def test_one_packet(self
):
181 self
.send_proxy_length(3)
183 result
= self
.recv(3)
184 self
.assertEqual(b
"foo", result
)
187 def test_no_received_data(self
):
188 self
.send_proxy_length(0)
192 def test_two_packets(self
):
193 self
.send_proxy_length(6)
196 result
= self
.recv(3)
197 self
.assertEqual(b
"foo", result
)
200 result
= self
.recv(3)
201 self
.assertEqual(b
"bar", result
)
205 def test_large_packet(self
):
207 self
.send_proxy_length(len(msg
))
211 part
= self
.sock
.recv(4)
212 self
.assertEqual(b
"1234", part
)
217 class TestIPv4Proxy(SocketHelper
, ProxyPacketHelper
, unittest
.TestCase
):
218 def run(self
, result
=None):
220 unittest
.TestCase
.run(self
, result
)
225 self
.send(b
"\x05\x01\x00")
226 self
.assertAuthSuccess()
228 self
.send(b
"\x05\x01\x00\x01\x7f\x00\x00\x01" + ECHO_PORT_B
)
229 self
.assertRequestSuccess()
232 self
.destroy_socket()
234 class TestDNSProxy(SocketHelper
, ProxyPacketHelper
, unittest
.TestCase
):
235 def run(self
, result
=None):
237 unittest
.TestCase
.run(self
, result
)
242 self
.send(b
"\x05\x01\x00")
243 self
.assertAuthSuccess()
245 self
.send(b
"\x05\x01\x00\x03\x09localhost" + ECHO_PORT_B
)
246 self
.assertRequestSuccess()
249 self
.destroy_socket()
251 class TestIPv6Proxy(SocketHelper
, ProxyPacketHelper
, unittest
.TestCase
):
252 def run(self
, result
=None):
254 unittest
.TestCase
.run(self
, result
)
259 self
.send(b
"\x05\x01\x00")
260 self
.assertAuthSuccess()
262 self
.send(b
"\x05\x01\x00\x04" + (b
"\x00"*15) + b
"\x01" + ECHO_PORT_B
)
263 self
.assertRequestSuccess()
266 self
.destroy_socket()
268 class TestPermissions(SocketHelper
, unittest
.TestCase
):
269 def assert_connection_allowed(self
):
272 self
.send(b
"\x05\x01\x00")
273 self
.assertAuthSuccess()
275 self
.destroy_socket()
277 def assert_ipv6_connection_allowed(self
):
279 self
.init_ipv6_socket()
280 self
.send(b
"\x05\x01\x00")
281 self
.assertAuthSuccess()
283 self
.destroy_socket()
285 def assert_connection_blocked(self
):
288 self
.send(b
"\x05\x01\x00")
289 self
.assertAuthSuccess()
290 self
.fail("Expected ConnectionResetError")
291 except ConnectionResetError
:
294 self
.destroy_socket()
296 def test_allow_all_connections(self
):
297 with
SocksServer({"ALLOW_ALL": "1"}):
298 self
.assert_connection_allowed()
300 def test_block_all_connections(self
):
301 with
SocksServer({}):
302 self
.assert_connection_blocked()
304 def test_allow_ipv4_host(self
):
305 with
SocksServer({"ALLOW_HOST1": "127.0.0.1"}):
306 self
.assert_connection_allowed()
308 def test_allow_ipv6_host(self
):
309 with
SocksServer({"ALLOW_HOST1": "::1"}):
310 self
.assert_ipv6_connection_allowed()
312 def test_allow_multiple_hosts(self
):
313 with
SocksServer({"ALLOW_HOST1": "foo.invalid", "ALLOW_HOST2": "localhost"}):
314 self
.assert_connection_allowed()
317 class EchoServer(object):
321 self
.sock
= socket
.socket(socket
.AF_INET6
, socket
.SOCK_STREAM
)
322 self
.sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
323 self
.sock
.bind(("", ECHO_PORT
))
326 self
.thread
= threading
.Thread(target
=self
.run_echo_server
)
331 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
333 self
.sock
.shutdown(socket
.SHUT_RDWR
)
336 def run_echo_server(self
):
339 client
, addr
= self
.sock
.accept()
341 self
.handle_echo_client(client
)
348 def handle_echo_client(self
, client
):
349 line
= client
.recv(10+1)
352 num_bytes
= int(line
)
355 # force the test app to handle many packets by using small ones
356 data
= client
.recv(16)
359 num_bytes
-= len(data
)
361 l
= client
.send(data
)
364 class SocksServer(object):
365 def __init__(self
, extra_env
={"ALLOW_ALL": "1"}):
367 self
.env
["LISTEN_PORT"] = str(SOCKS_PORT
)
368 self
.env
.update(extra_env
)
369 self
.devnull
= open("/dev/null", "w")
372 self
.process
= subprocess
.Popen(
373 args
=["./socks5server"],
383 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
385 self
.process
.terminate()
388 def wait_for_port(self
):
389 start_time
= time
.time()
390 with socket
.socket(socket
.AF_INET
) as s
:
391 while start_time
+ 10 > time
.time():
393 s
.connect(("localhost", SOCKS_PORT
))
395 except ConnectionRefusedError
:
399 if __name__
== "__main__":