- def __init__(self, rfcomm_id, bt_addr, bt_channel, apn):
- self.rfcomm_id = rfcomm_id
- self.bt_addr = bt_addr
- self.bt_channel = bt_channel
- self.apn = apn
-
- self.rfcomm = None
- self.wvdial = None
- self.wvdial_conf_name = None
- self.dbus_system = None
-
- def release(self):
- if self.wvdial:
- try:
- self.wvdial.terminate()
- self.wvdial.wait()
- except Exception as e:
- print(e)
-
- if self.rfcomm:
- try:
- self.rfcomm.terminate()
- self.rfcomm.wait()
- except Exception as e:
- print(e)
-
- if self.wvdial_conf_name:
- try:
- os.unlink(self.wvdial_conf_name)
- except Exception as e:
- print(e)
-
- try:
- reset_rfcomm(self.rfcomm_id)
- except Exception as e:
- print(e)
-
- if self.dbus_system:
- try:
- self.disconnect_bluetooth()
- except Exception as e:
- print(e)
-
-
- def setup_dbus(self):
- self.dbus_system = dbus.SystemBus()
-
- def enable_bluetooth(self):
- bluez = self.dbus_system.get_object("org.bluez", "/org/bluez/hci0")
- iprops = dbus.Interface(bluez, "org.freedesktop.DBus.Properties")
- iprops.Set("org.bluez.Adapter1", "Powered", True)
-
- def disconnect_bluetooth(self):
- path = self.bt_addr.upper().replace(":", "_")
- bluez_dev = self.dbus_system.get_object("org.bluez", "/org/bluez/hci0/dev_" + path)
- idev = dbus.Interface(bluez_dev, "org.bluez.Device1")
- idev.Disconnect()
-
- def connect_rfcomm(self):
- self.rfcomm = subprocess.Popen([
- "rfcomm",
- "connect",
- self.rfcomm_id,
- self.bt_addr,
- self.bt_channel,
- ])
-
- # poll until connected
- start = time.time()
- while time.time() - start < 10:
- if self.is_rfcomm_connected():
- return
- time.sleep(0.1)
- raise DiallerException("Timeout connecting rfcomm")
-
- def is_rfcomm_connected(self):
- output = subprocess.check_output(["rfcomm", "-a"])
- for line in output.decode("ascii").split("\n"):
- if not line.startswith("rfcomm%s:" % self.rfcomm_id):
- continue
- if line.find(" connected ") >= 0:
- return True
- return False
-
- def write_wvdial_conf(self):
- fd, self.wvdial_conf_name = tempfile.mkstemp()
- f = os.fdopen(fd, "w")
- f.write("""
+ def __init__(self, rfcomm_id, bt_addr, bt_channel, apn):
+ self.rfcomm_id = rfcomm_id
+ self.bt_addr = bt_addr
+ self.bt_channel = bt_channel
+ self.apn = apn
+
+ self.rfcomm = None
+ self.wvdial = None
+ self.wvdial_conf_name = None
+ self.dbus_system = None
+
+ def release(self):
+ if self.wvdial:
+ try:
+ self.wvdial.terminate()
+ self.wvdial.wait()
+ except Exception as e:
+ print(e)
+
+ if self.rfcomm:
+ try:
+ self.rfcomm.terminate()
+ self.rfcomm.wait()
+ except Exception as e:
+ print(e)
+
+ if self.wvdial_conf_name:
+ try:
+ os.unlink(self.wvdial_conf_name)
+ except Exception as e:
+ print(e)
+
+ try:
+ reset_rfcomm(self.rfcomm_id)
+ except Exception as e:
+ print(e)
+
+ if self.dbus_system:
+ try:
+ self.disconnect_bluetooth()
+ except Exception as e:
+ print(e)
+
+
+ def setup_dbus(self):
+ self.dbus_system = dbus.SystemBus()
+
+ def enable_bluetooth(self):
+ bluez = self.dbus_system.get_object("org.bluez", "/org/bluez/hci0")
+ iprops = dbus.Interface(bluez, "org.freedesktop.DBus.Properties")
+ iprops.Set("org.bluez.Adapter1", "Powered", True)
+
+ def disconnect_bluetooth(self):
+ path = self.bt_addr.upper().replace(":", "_")
+ bluez_dev = self.dbus_system.get_object("org.bluez", "/org/bluez/hci0/dev_" + path)
+ idev = dbus.Interface(bluez_dev, "org.bluez.Device1")
+ idev.Disconnect()
+
+ def connect_rfcomm(self):
+ self.rfcomm = subprocess.Popen([
+ "rfcomm",
+ "connect",
+ self.rfcomm_id,
+ self.bt_addr,
+ self.bt_channel,
+ ])
+
+ # poll until connected
+ start = time.time()
+ while time.time() - start < 10:
+ if self.is_rfcomm_connected():
+ return
+ time.sleep(0.1)
+ raise DiallerException("Timeout connecting rfcomm")
+
+ def is_rfcomm_connected(self):
+ output = subprocess.check_output(["rfcomm", "-a"])
+ for line in output.decode("ascii").split("\n"):
+ if not line.startswith("rfcomm%s:" % self.rfcomm_id):
+ continue
+ if line.find(" connected ") >= 0:
+ return True
+ return False
+
+ def write_wvdial_conf(self):
+ fd, self.wvdial_conf_name = tempfile.mkstemp()
+ f = os.fdopen(fd, "w")
+ f.write("""