Skip to content

Modules

Main module.

LifxCloud

Class to represent LIFX Cloud access.

Source code in src/asyncio_lifx_scenes/scenes.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class LifxCloud:
    """Class to represent LIFX Cloud access."""

    def __init__(self, token: str) -> None:
        """Initialize the LIFX scene."""
        self.scenes: list[LifxScene] = []
        self._auth_token = HeaderApiKey(f"Bearer {token}", header_name="Authorization")

    def list_scenes(self) -> list[LifxScene]:
        """Return a list of scenes stored on LIFX Cloud."""
        header = {"accept": "application/json"}

        with httpx.Client(auth=self._auth_token, timeout=TIMEOUT) as client:
            resp = client.get(LIFX_URL, headers=header)
            try:
                resp.raise_for_status()
                for scene in resp.json():
                    self.scenes.append(LifxScene(**scene))
            except httpx.HTTPStatusError as exc:
                _LOGGER.exception("Error response %s from %s", exc.response.status_code, exc.request.url)
                return []
            else:
                return self.scenes

    async def async_list_scenes(self) -> list[LifxScene]:
        """Asynchronously return a list of scenes stored on LIFX Cloud."""
        header = {"accept": "application/json"}
        async with httpx.AsyncClient(auth=self._auth_token, timeout=TIMEOUT) as client:
            resp = await client.get(LIFX_URL, headers=header)
            try:
                resp.raise_for_status()
                for scene in resp.json():
                    self.scenes.append(LifxScene(**scene))
            except httpx.HTTPStatusError as exc:
                _LOGGER.exception("Error response %s from %s", exc.response.status_code, exc.request.url)
                return []
            else:
                return self.scenes

    def activate_scene(
        self, scene_uuid: str, duration: int = 1, ignore: list[str] | None = None, fast: bool = False
    ) -> list[dict[str, Any]] | None:
        """Activate a scene by UUID."""
        headers = {
            "accept": "application/json",
            "content-type": "application/json",
        }

        payload = {
            "duration": duration,
            "ignore": ignore if ignore is not None else [],
            "fast": fast,
        }

        with httpx.Client(auth=self._auth_token, timeout=TIMEOUT) as client:
            resp = client.put(f"{LIFX_URL}/scene_id:{scene_uuid}/activate", json=payload, headers=headers)
            results: list[dict[str, Any]] = []

            try:
                resp.raise_for_status()
                if fast:
                    return None
                results = resp.json().get("results", None)

            except httpx.HTTPStatusError as exc:
                _LOGGER.exception("Error response %s from %s", exc.response.status_code, exc.request.url)

            return results

    async def async_activate_scene(
        self, scene_uuid: str, duration: int = 1, ignore: list[str] | None = None, fast: bool = False
    ) -> list[dict[str, str]] | None:
        """Activate a scene by UUID."""
        headers = {
            "accept": "application/json",
            "content-type": "application/json",
        }

        payload = {
            "duration": duration,
            "ignore": ignore if ignore is not None else [],
            "fast": fast,
        }

        async with httpx.AsyncClient(auth=self._auth_token, timeout=TIMEOUT) as client:
            resp = await client.put(f"{LIFX_URL}/scene_id:{scene_uuid}/activate", json=payload, headers=headers)
            results: list[dict[str, str]] = []

            try:
                resp.raise_for_status()
                if fast:
                    return None
                results = resp.json().get("results", None)

            except httpx.HTTPStatusError as exc:
                _LOGGER.exception("Error response %s from %s", exc.response.status_code, exc.request.url)

            return results

__init__(token)

Initialize the LIFX scene.

Source code in src/asyncio_lifx_scenes/scenes.py
22
23
24
25
def __init__(self, token: str) -> None:
    """Initialize the LIFX scene."""
    self.scenes: list[LifxScene] = []
    self._auth_token = HeaderApiKey(f"Bearer {token}", header_name="Authorization")

activate_scene(scene_uuid, duration=1, ignore=None, fast=False)

Activate a scene by UUID.

Source code in src/asyncio_lifx_scenes/scenes.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def activate_scene(
    self, scene_uuid: str, duration: int = 1, ignore: list[str] | None = None, fast: bool = False
) -> list[dict[str, Any]] | None:
    """Activate a scene by UUID."""
    headers = {
        "accept": "application/json",
        "content-type": "application/json",
    }

    payload = {
        "duration": duration,
        "ignore": ignore if ignore is not None else [],
        "fast": fast,
    }

    with httpx.Client(auth=self._auth_token, timeout=TIMEOUT) as client:
        resp = client.put(f"{LIFX_URL}/scene_id:{scene_uuid}/activate", json=payload, headers=headers)
        results: list[dict[str, Any]] = []

        try:
            resp.raise_for_status()
            if fast:
                return None
            results = resp.json().get("results", None)

        except httpx.HTTPStatusError as exc:
            _LOGGER.exception("Error response %s from %s", exc.response.status_code, exc.request.url)

        return results

async_activate_scene(scene_uuid, duration=1, ignore=None, fast=False) async

Activate a scene by UUID.

Source code in src/asyncio_lifx_scenes/scenes.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
async def async_activate_scene(
    self, scene_uuid: str, duration: int = 1, ignore: list[str] | None = None, fast: bool = False
) -> list[dict[str, str]] | None:
    """Activate a scene by UUID."""
    headers = {
        "accept": "application/json",
        "content-type": "application/json",
    }

    payload = {
        "duration": duration,
        "ignore": ignore if ignore is not None else [],
        "fast": fast,
    }

    async with httpx.AsyncClient(auth=self._auth_token, timeout=TIMEOUT) as client:
        resp = await client.put(f"{LIFX_URL}/scene_id:{scene_uuid}/activate", json=payload, headers=headers)
        results: list[dict[str, str]] = []

        try:
            resp.raise_for_status()
            if fast:
                return None
            results = resp.json().get("results", None)

        except httpx.HTTPStatusError as exc:
            _LOGGER.exception("Error response %s from %s", exc.response.status_code, exc.request.url)

        return results

async_list_scenes() async

Asynchronously return a list of scenes stored on LIFX Cloud.

Source code in src/asyncio_lifx_scenes/scenes.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def async_list_scenes(self) -> list[LifxScene]:
    """Asynchronously return a list of scenes stored on LIFX Cloud."""
    header = {"accept": "application/json"}
    async with httpx.AsyncClient(auth=self._auth_token, timeout=TIMEOUT) as client:
        resp = await client.get(LIFX_URL, headers=header)
        try:
            resp.raise_for_status()
            for scene in resp.json():
                self.scenes.append(LifxScene(**scene))
        except httpx.HTTPStatusError as exc:
            _LOGGER.exception("Error response %s from %s", exc.response.status_code, exc.request.url)
            return []
        else:
            return self.scenes

list_scenes()

Return a list of scenes stored on LIFX Cloud.

Source code in src/asyncio_lifx_scenes/scenes.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def list_scenes(self) -> list[LifxScene]:
    """Return a list of scenes stored on LIFX Cloud."""
    header = {"accept": "application/json"}

    with httpx.Client(auth=self._auth_token, timeout=TIMEOUT) as client:
        resp = client.get(LIFX_URL, headers=header)
        try:
            resp.raise_for_status()
            for scene in resp.json():
                self.scenes.append(LifxScene(**scene))
        except httpx.HTTPStatusError as exc:
            _LOGGER.exception("Error response %s from %s", exc.response.status_code, exc.request.url)
            return []
        else:
            return self.scenes