diff --git a/moto/workspacesweb/models.py b/moto/workspacesweb/models.py index 2cd4612d4cf0..23c95af99a1b 100644 --- a/moto/workspacesweb/models.py +++ b/moto/workspacesweb/models.py @@ -9,6 +9,95 @@ from moto.utilities.utils import get_partition +class FakeUserSettings(BaseModel): + def __init__( + self, + additional_encryption_context: Any, + client_token: str, + cookie_synchronization_configuration: str, + copy_allowed: bool, + customer_managed_key: str, + deep_link_allowed: bool, + disconnect_timeout_in_minutes: int, + download_allowed: bool, + idle_disconnect_timeout_in_minutes: int, + paste_allowed: bool, + print_allowed: bool, + tags: Dict[str, str], + upload_allowed: bool, + region_name: str, + account_id: str, + ): + self.user_settings_id = str(uuid.uuid4()) + self.arn = self.arn_formatter(self.user_settings_id, account_id, region_name) + self.additional_encryption_context = additional_encryption_context + self.client_token = client_token + self.cookie_synchronization_configuration = cookie_synchronization_configuration + self.copy_allowed = "Enabled" if copy_allowed else "Disabled" + self.customer_managed_key = customer_managed_key + self.deep_link_allowed = "Enabled" if deep_link_allowed else "Disabled" + self.disconnect_timeout_in_minutes = disconnect_timeout_in_minutes + self.download_allowed = "Enabled" if download_allowed else "Disabled" + self.idle_disconnect_timeout_in_minutes = idle_disconnect_timeout_in_minutes + self.paste_allowed = "Enabled" if paste_allowed else "Disabled" + self.print_allowed = "Enabled" if print_allowed else "Disabled" + self.tags = tags + self.upload_allowed = "Enabled" if upload_allowed else "Disabled" + self.associated_portal_arns: List[str] = [] + + def arn_formatter(self, _id: str, account_id: str, region_name: str) -> str: + return f"arn:{get_partition(region_name)}:workspaces-web:{region_name}:{account_id}:user-settings/{_id}" + + def to_dict(self) -> Dict[str, Any]: + return { + "associatedPortalArns": self.associated_portal_arns, + "additionalEncryptionContext": self.additional_encryption_context, + "clientToken": self.client_token, + "cookieSynchronizationConfiguration": self.cookie_synchronization_configuration, + "copyAllowed": self.copy_allowed, + "customerManagedKey": self.customer_managed_key, + "deepLinkAllowed": self.deep_link_allowed, + "disconnectTimeoutInMinutes": self.disconnect_timeout_in_minutes, + "downloadAllowed": self.download_allowed, + "idleDisconnectTimeoutInMinutes": self.idle_disconnect_timeout_in_minutes, + "pasteAllowed": self.paste_allowed, + "printAllowed": self.print_allowed, + "tags": self.tags, + "uploadAllowed": self.upload_allowed, + "userSettingsArn": self.arn, + } + + +class FakeUserAccessLoggingSettings(BaseModel): + def __init__( + self, + client_token: str, + kinesis_stream_arn: str, + tags: Dict[str, str], + region_name: str, + account_id: str, + ): + self.user_access_logging_settings_id = str(uuid.uuid4()) + self.arn = self.arn_formatter( + self.user_access_logging_settings_id, account_id, region_name + ) + self.client_token = client_token + self.kinesis_stream_arn = kinesis_stream_arn + self.tags = tags + self.associated_portal_arns: List[str] = [] + + def arn_formatter(self, _id: str, account_id: str, region_name: str) -> str: + return f"arn:{get_partition(region_name)}:workspaces-web:{region_name}:{account_id}:user-access-logging-settings/{_id}" + + def to_dict(self) -> Dict[str, Any]: + return { + "associatedPortalArns": self.associated_portal_arns, + "kinesisStreamArn": self.kinesis_stream_arn, + "tags": self.tags, + "userAccessLoggingSettingsArn": self.arn, + } + + class FakeNetworkSettings(BaseModel): def __init__( self, @@ -147,6 +236,8 @@ def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.network_settings: Dict[str, FakeNetworkSettings] = {} self.browser_settings: Dict[str, FakeBrowserSettings] = {} + self.user_settings: Dict[str, FakeUserSettings] = {} + self.user_access_logging_settings: Dict[str, FakeUserAccessLoggingSettings] = {} self.portals: Dict[str, FakePortal] = {} def create_network_settings( @@ -286,5 +377,104 @@ def associate_network_settings( portal_object.network_settings_arn = network_settings_arn return network_settings_arn, portal_arn + def create_user_settings( + self, + additional_encryption_context: Any, + client_token: Any, + cookie_synchronization_configuration: Any, + copy_allowed: Any, + customer_managed_key: Any, + deep_link_allowed: Any, + disconnect_timeout_in_minutes: Any, + download_allowed: Any, + idle_disconnect_timeout_in_minutes: Any, + paste_allowed: Any, + print_allowed: Any, + tags: Any, + upload_allowed: Any, + ) -> str: + user_settings_object = FakeUserSettings( + additional_encryption_context, + client_token, + cookie_synchronization_configuration, + copy_allowed, + customer_managed_key, + deep_link_allowed, + disconnect_timeout_in_minutes, + download_allowed, + idle_disconnect_timeout_in_minutes, + paste_allowed, + print_allowed, + tags, + upload_allowed, + self.region_name, + self.account_id, + ) + self.user_settings[user_settings_object.arn] = user_settings_object + return user_settings_object.arn + + def get_user_settings(self, user_settings_arn: str) -> Dict[str, Any]: + return self.user_settings[user_settings_arn].to_dict() + + def delete_user_settings(self, user_settings_arn: str) -> None: + self.user_settings.pop(user_settings_arn) + + def create_user_access_logging_settings( + self, client_token: Any, kinesis_stream_arn: Any, tags: Any + ) -> str: + user_access_logging_settings_object = FakeUserAccessLoggingSettings( + client_token, kinesis_stream_arn, tags, self.region_name, self.account_id + ) + self.user_access_logging_settings[user_access_logging_settings_object.arn] = ( + user_access_logging_settings_object + ) + return user_access_logging_settings_object.arn + + def get_user_access_logging_settings( + self, user_access_logging_settings_arn: str + ) -> Dict[str, Any]: + return self.user_access_logging_settings[ + user_access_logging_settings_arn + ].to_dict() + + def delete_user_access_logging_settings( + self, user_access_logging_settings_arn: str + ) -> None: + self.user_access_logging_settings.pop(user_access_logging_settings_arn) + + def associate_user_settings( + self, portal_arn: str, user_settings_arn: str + ) -> Tuple[str, str]: + user_settings_object = self.user_settings[user_settings_arn] + portal_object = self.portals[portal_arn] + user_settings_object.associated_portal_arns.append(portal_arn) + portal_object.user_settings_arn = user_settings_arn + return portal_arn, user_settings_arn + + def associate_user_access_logging_settings( + self, portal_arn: str, user_access_logging_settings_arn: str + ) -> Tuple[str, str]: + user_access_logging_settings_object = self.user_access_logging_settings[ + user_access_logging_settings_arn + ] + portal_object = self.portals[portal_arn] + user_access_logging_settings_object.associated_portal_arns.append(portal_arn) + portal_object.user_access_logging_settings_arn = ( + user_access_logging_settings_arn + ) + return portal_arn, user_access_logging_settings_arn + + def list_user_settings(self) -> List[Dict[str, str]]: + return [ + {"userSettingsArn": user_settings.arn} + for user_settings in self.user_settings.values() + ] + + def list_user_access_logging_settings(self) -> List[Dict[str, str]]: + return [ + {"userAccessLoggingSettingsArn": user_access_logging_settings.arn} + for user_access_logging_settings in self.user_access_logging_settings.values() + ] + workspacesweb_backends = BackendDict(WorkSpacesWebBackend, "workspaces-web") diff --git a/moto/workspacesweb/responses.py b/moto/workspacesweb/responses.py index dcda234f3371..e4f43f4784af 100644 --- a/moto/workspacesweb/responses.py +++ b/moto/workspacesweb/responses.py @@ -38,6 +38,26 @@ def browser_settings(request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE else: return handler.delete_browser_settings() + @staticmethod + def user_settings(request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[misc] + handler = WorkSpacesWebResponse() + handler.setup_class(request, full_url, headers) + if request.method == "GET": + return handler.get_user_settings() + else: + return handler.delete_user_settings() + + @staticmethod + def user_access_logging_settings( # type: ignore[misc] + request: Any, full_url: str, headers: Any + ) -> TYPE_RESPONSE: + handler = WorkSpacesWebResponse() + handler.setup_class(request, full_url, headers) + if request.method == "GET": + return handler.get_user_access_logging_settings() + else: + return handler.delete_user_access_logging_settings() + @staticmethod def portal(request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[misc] handler = WorkSpacesWebResponse() @@ -183,3 +203,139 @@ def associate_network_settings(self) -> str: return json.dumps( dict(networkSettingsArn=network_settings_arn, portalArn=portal_arn) ) + + def create_user_settings(self) -> str: + params = self._get_params() + additional_encryption_context = params.get("additionalEncryptionContext") + client_token = params.get("clientToken") + cookie_synchronization_configuration = params.get( + "cookieSynchronizationConfiguration" + ) + copy_allowed = params.get("copyAllowed") + customer_managed_key = params.get("customerManagedKey") + deep_link_allowed = params.get("deepLinkAllowed") + disconnect_timeout_in_minutes = params.get("disconnectTimeoutInMinutes") + download_allowed = params.get("downloadAllowed") + idle_disconnect_timeout_in_minutes = params.get( + "idleDisconnectTimeoutInMinutes" + ) + paste_allowed = params.get("pasteAllowed") + print_allowed = params.get("printAllowed") + tags = params.get("tags") + upload_allowed = params.get("uploadAllowed") + user_settings_arn = self.workspacesweb_backend.create_user_settings( + additional_encryption_context=additional_encryption_context, + client_token=client_token, + cookie_synchronization_configuration=cookie_synchronization_configuration, + copy_allowed=copy_allowed, + customer_managed_key=customer_managed_key, + deep_link_allowed=deep_link_allowed, + disconnect_timeout_in_minutes=disconnect_timeout_in_minutes, + download_allowed=download_allowed, + idle_disconnect_timeout_in_minutes=idle_disconnect_timeout_in_minutes, + paste_allowed=paste_allowed, + print_allowed=print_allowed, + tags=tags, + upload_allowed=upload_allowed, + ) + return json.dumps(dict(userSettingsArn=user_settings_arn)) + + def get_user_settings(self) -> TYPE_RESPONSE: + user_settings_arn = unquote(self.parsed_url.path.split("/userSettings/")[-1]) + user_settings = self.workspacesweb_backend.get_user_settings( + user_settings_arn=user_settings_arn, + ) + return 200, {}, json.dumps(dict(userSettings=user_settings)) + + def delete_user_settings(self) -> TYPE_RESPONSE: + user_settings_arn = unquote(self.parsed_url.path.split("/userSettings/")[-1]) + self.workspacesweb_backend.delete_user_settings( + user_settings_arn=user_settings_arn, + ) + return 200, {}, "{}" + + def create_user_access_logging_settings(self) -> str: + params = self._get_params() + params = json.loads(list(params.keys())[0]) + client_token = params.get("clientToken") + kinesis_stream_arn = params.get("kinesisStreamArn") + tags = params.get("tags") + user_access_logging_settings_arn = ( + self.workspacesweb_backend.create_user_access_logging_settings( + client_token=client_token, + kinesis_stream_arn=kinesis_stream_arn, + tags=tags, + ) + ) + return json.dumps( + dict(userAccessLoggingSettingsArn=user_access_logging_settings_arn) + ) + + def get_user_access_logging_settings(self) -> TYPE_RESPONSE: + user_access_logging_settings_arn = unquote( + self.parsed_url.path.split("/userAccessLoggingSettings/")[-1] + ) + user_access_logging_settings = ( + self.workspacesweb_backend.get_user_access_logging_settings( + user_access_logging_settings_arn=user_access_logging_settings_arn, + ) + ) + return ( + 200, + {}, + json.dumps(dict(userAccessLoggingSettings=user_access_logging_settings)), + ) + + def delete_user_access_logging_settings(self) -> TYPE_RESPONSE: + user_access_logging_settings_arn = unquote( + self.parsed_url.path.split("/userAccessLoggingSettings/")[-1] + ) + self.workspacesweb_backend.delete_user_access_logging_settings( + user_access_logging_settings_arn=user_access_logging_settings_arn, + ) + return 200, {}, "{}" + + def associate_user_settings(self) -> str: + user_settings_arn = unquote(self._get_param("userSettingsArn")) + portal_arn = unquote( + self.parsed_url.path.split("/portals/")[-1].split("/userSettings")[0] + ) + user_settings_arn, portal_arn = ( + self.workspacesweb_backend.associate_user_settings( + user_settings_arn=user_settings_arn, + portal_arn=portal_arn, + ) + ) + return json.dumps(dict(userSettingsArn=user_settings_arn, portalArn=portal_arn)) + + def associate_user_access_logging_settings(self) -> str: + user_access_logging_settings_arn = unquote( + self._get_param("userAccessLoggingSettingsArn") + ) + portal_arn = unquote( + self.parsed_url.path.split("/portals/")[-1].split( + "/userAccessLoggingSettings" + )[0] + ) + user_access_logging_settings_arn, portal_arn = ( + self.workspacesweb_backend.associate_user_access_logging_settings( + user_access_logging_settings_arn=user_access_logging_settings_arn, + portal_arn=portal_arn, + ) + ) + return json.dumps( + dict( + userAccessLoggingSettingsArn=user_access_logging_settings_arn, + portalArn=portal_arn, + ) + ) + + def list_user_settings(self) -> str: + user_settings = self.workspacesweb_backend.list_user_settings() + return json.dumps(dict(userSettings=user_settings)) + + def list_user_access_logging_settings(self) -> str: + user_access_logging_settings = ( + self.workspacesweb_backend.list_user_access_logging_settings() + ) + return json.dumps(dict(userAccessLoggingSettings=user_access_logging_settings)) diff --git a/moto/workspacesweb/urls.py b/moto/workspacesweb/urls.py index 6fe82a99ee3c..8c2fb156af48 100644 --- a/moto/workspacesweb/urls.py +++ b/moto/workspacesweb/urls.py @@ -9,10 +9,16 @@ url_paths = { "{0}/browserSettings$": WorkSpacesWebResponse.dispatch, "{0}/networkSettings$": WorkSpacesWebResponse.dispatch, + "{0}/userSettings$": WorkSpacesWebResponse.dispatch, + "{0}/userAccessLoggingSettings$": WorkSpacesWebResponse.dispatch, "{0}/portals$": WorkSpacesWebResponse.dispatch, + "{0}/portals/(?P[^/]+)portal/(?P[^/]+)$": WorkSpacesWebResponse.portal, "{0}/browserSettings/(?P.+)$": WorkSpacesWebResponse.browser_settings, "{0}/networkSettings/(?P.+)$": WorkSpacesWebResponse.network_settings, - "{0}/portals/(?P[^/]+)portal/(?P[^/]+)$": WorkSpacesWebResponse.portal, + "{0}/userSettings/(?P.+)$": WorkSpacesWebResponse.user_settings, + "{0}/userAccessLoggingSettings/(?P.+)$": WorkSpacesWebResponse.user_access_logging_settings, "{0}/portals/(?P.*)/browserSettings$": WorkSpacesWebResponse.dispatch, "{0}/portals/(?P.*)/networkSettings$": WorkSpacesWebResponse.dispatch, + "{0}/portals/(?P.*)/userSettings$": WorkSpacesWebResponse.dispatch, + "{0}/portals/(?P.*)/userAccessLoggingSettings$": WorkSpacesWebResponse.dispatch, } diff --git a/tests/test_workspacesweb/test_workspacesweb.py b/tests/test_workspacesweb/test_workspacesweb.py index 854a7599b76b..e9f89deb1c07 100644 --- a/tests/test_workspacesweb/test_workspacesweb.py +++ b/tests/test_workspacesweb/test_workspacesweb.py @@ -269,3 +269,170 @@ def test_associate_network_settings(): "networkSettings" ] assert resp["associatedPortalArns"] == [portal_arn] + + +@mock_aws +def test_create_user_settings(): + client = boto3.client("workspaces-web", region_name="eu-west-1") + resp = client.create_user_settings( + copyAllowed="Disabled", + pasteAllowed="Disabled", + printAllowed="Disabled", + uploadAllowed="Disabled", + downloadAllowed="Disabled", + ) + user_settings_arn = resp["userSettingsArn"] + arn_regex = r"^arn:aws:workspaces-web:eu-west-1:\d{12}:user-settings/[a-f0-9-]+$" + assert re.match(arn_regex, user_settings_arn) is not None + + +@mock_aws +def test_get_user_settings(): + client = boto3.client("workspaces-web", region_name="eu-west-1") + resp = client.create_user_settings( + copyAllowed="Disabled", + pasteAllowed="Disabled", + printAllowed="Disabled", + uploadAllowed="Disabled", + downloadAllowed="Disabled", + ) + user_settings_arn = resp["userSettingsArn"] + resp = client.get_user_settings(userSettingsArn=user_settings_arn)["userSettings"] + assert resp["userSettingsArn"] == user_settings_arn + assert resp["copyAllowed"] == "Disabled" + assert resp["pasteAllowed"] == "Disabled" + assert resp["printAllowed"] == "Disabled" + assert resp["uploadAllowed"] == "Disabled" + assert resp["downloadAllowed"] == "Disabled" + + +@mock_aws +def test_list_user_settings(): + client = boto3.client("workspaces-web", region_name="ap-southeast-1") + arn = client.create_user_settings( + copyAllowed="Disabled", + pasteAllowed="Disabled", + printAllowed="Disabled", + uploadAllowed="Disabled", + downloadAllowed="Disabled", + )["userSettingsArn"] + resp = client.list_user_settings() + assert resp["userSettings"][0]["userSettingsArn"] == arn + + +@mock_aws +def test_delete_user_settings(): + client = boto3.client("workspaces-web", region_name="eu-west-1") + arn = client.create_user_settings( + copyAllowed="Disabled", + pasteAllowed="Disabled", + printAllowed="Disabled", + uploadAllowed="Disabled", + downloadAllowed="Disabled", + )["userSettingsArn"] + client.delete_user_settings(userSettingsArn=arn) + assert len(client.list_user_settings()["userSettings"]) == 0 + + +@mock_aws +def test_associate_user_settings(): + client = boto3.client("workspaces-web", region_name="eu-west-1") + user_settings_arn = client.create_user_settings( + copyAllowed="Disabled", + pasteAllowed="Disabled", + printAllowed="Disabled", + uploadAllowed="Disabled", + downloadAllowed="Disabled", + )["userSettingsArn"] + portal_arn = client.create_portal( + additionalEncryptionContext={"Key1": "Value1", "Key2": "Value2"}, + authenticationType="Standard", + clientToken="TestClient", + customerManagedKey=FAKE_KMS_KEY_ID, + displayName="TestDisplayName", + instanceType="TestInstanceType", + maxConcurrentSessions=5, + tags=FAKE_TAGS, + )["portalArn"] + client.associate_user_settings( + userSettingsArn=user_settings_arn, portalArn=portal_arn + ) + resp = client.get_portal(portalArn=portal_arn)["portal"] + assert resp["userSettingsArn"] == user_settings_arn + resp = client.get_user_settings(userSettingsArn=user_settings_arn)["userSettings"] + assert resp["associatedPortalArns"] == [portal_arn] + + +@mock_aws +def test_create_user_access_logging_settings(): + client = boto3.client("workspaces-web", region_name="ap-southeast-1") + user_access_logging_settings_arn = client.create_user_access_logging_settings( + kinesisStreamArn="arn:aws:kinesis:ap-southeast-1:123456789012:stream/TestStream", + )["userAccessLoggingSettingsArn"] + arn_regex = r"^arn:aws:workspaces-web:ap-southeast-1:\d{12}:user-access-logging-settings/[a-f0-9-]+$" + assert re.match(arn_regex, user_access_logging_settings_arn) is not None + + +@mock_aws +def test_get_user_access_logging_settings(): + client = boto3.client("workspaces-web", region_name="eu-west-1") + user_access_logging_settings_arn = client.create_user_access_logging_settings( + kinesisStreamArn="arn:aws:kinesis:ap-southeast-1:123456789012:stream/TestStream", + )["userAccessLoggingSettingsArn"] + resp = client.get_user_access_logging_settings( + userAccessLoggingSettingsArn=user_access_logging_settings_arn + )["userAccessLoggingSettings"] + assert resp["userAccessLoggingSettingsArn"] == user_access_logging_settings_arn + assert ( + resp["kinesisStreamArn"] + == "arn:aws:kinesis:ap-southeast-1:123456789012:stream/TestStream" + ) + + +@mock_aws +def test_list_user_access_logging_settings(): + client = boto3.client("workspaces-web", region_name="eu-west-1") + arn = client.create_user_access_logging_settings( + kinesisStreamArn="arn:aws:kinesis:ap-southeast-1:123456789012:stream/TestStream", + )["userAccessLoggingSettingsArn"] + resp = client.list_user_access_logging_settings() + assert resp["userAccessLoggingSettings"][0]["userAccessLoggingSettingsArn"] == arn + + +@mock_aws +def test_delete_user_access_logging_settings(): + client = boto3.client("workspaces-web", region_name="ap-southeast-1") + arn = client.create_user_access_logging_settings( + kinesisStreamArn="arn:aws:kinesis:ap-southeast-1:123456789012:stream/TestStream", + )["userAccessLoggingSettingsArn"] + client.delete_user_access_logging_settings(userAccessLoggingSettingsArn=arn) + resp = client.list_user_access_logging_settings() + assert resp["userAccessLoggingSettings"] == [] + + +@mock_aws +def test_associate_user_access_logging_settings(): + client = boto3.client("workspaces-web", region_name="eu-west-1") + user_access_logging_settings_arn = client.create_user_access_logging_settings( + kinesisStreamArn="arn:aws:kinesis:ap-southeast-1:123456789012:stream/TestStream", + )["userAccessLoggingSettingsArn"] + portal_arn = client.create_portal( + additionalEncryptionContext={"Key1": "Value1", "Key2": "Value2"}, + authenticationType="Standard", + clientToken="TestClient", + customerManagedKey=FAKE_KMS_KEY_ID, + displayName="TestDisplayName", + instanceType="TestInstanceType", + maxConcurrentSessions=5, + tags=FAKE_TAGS, + )["portalArn"] + client.associate_user_access_logging_settings( + userAccessLoggingSettingsArn=user_access_logging_settings_arn, + portalArn=portal_arn, + ) + resp = client.get_portal(portalArn=portal_arn)["portal"] + assert resp["userAccessLoggingSettingsArn"] == user_access_logging_settings_arn + resp = client.get_user_access_logging_settings( + userAccessLoggingSettingsArn=user_access_logging_settings_arn + )["userAccessLoggingSettings"] + assert resp["associatedPortalArns"] == [portal_arn]