Found 29 repositories(showing 29)
SamVerschueren
A Map implementation with expirable items
michealbalogun
Copyright 2012 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # # Copyright 2012 Nebula, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import glob import logging import os import sys import warnings from django.utils.translation import pgettext_lazy from django.utils.translation import ugettext_lazy as _ from horizon.utils.escape import monkeypatch_escape from openstack_dashboard import enabled from openstack_dashboard import exceptions from openstack_dashboard.local import enabled as local_enabled from openstack_dashboard import theme_settings from openstack_dashboard.utils import config from openstack_dashboard.utils import settings as settings_utils monkeypatch_escape() _LOG = logging.getLogger(__name__) warnings.formatwarning = lambda message, category, *args, **kwargs: \ '%s: %s' % (category.__name__, message) ROOT_PATH = os.path.dirname(os.path.abspath(__file__)) if ROOT_PATH not in sys.path: sys.path.append(ROOT_PATH) DEBUG = False SITE_BRANDING = 'OpenStack Dashboard' WEBROOT = '/' LOGIN_URL = None LOGOUT_URL = None LOGIN_ERROR = None LOGIN_REDIRECT_URL = None MEDIA_ROOT = None MEDIA_URL = None STATIC_ROOT = None STATIC_URL = None SELECTABLE_THEMES = None INTEGRATION_TESTS_SUPPORT = False NG_TEMPLATE_CACHE_AGE = 2592000 ROOT_URLCONF = 'openstack_dashboard.urls' HORIZON_CONFIG = { 'user_home': 'openstack_dashboard.views.get_user_home', 'ajax_queue_limit': 10, 'auto_fade_alerts': { 'delay': 3000, 'fade_duration': 1500, 'types': ['alert-success', 'alert-info'] }, 'bug_url': None, 'help_url': "https://docs.openstack.org/", 'exceptions': {'recoverable': exceptions.RECOVERABLE, 'not_found': exceptions.NOT_FOUND, 'unauthorized': exceptions.UNAUTHORIZED}, 'modal_backdrop': 'static', 'angular_modules': [], 'js_files': [], 'js_spec_files': [], 'external_templates': [], 'plugins': [], 'integration_tests_support': INTEGRATION_TESTS_SUPPORT } # The OPENSTACK_IMAGE_BACKEND settings can be used to customize features # in the OpenStack Dashboard related to the Image service, such as the list # of supported image formats. OPENSTACK_IMAGE_BACKEND = { 'image_formats': [ ('', _('Select format')), ('aki', _('AKI - Amazon Kernel Image')), ('ami', _('AMI - Amazon Machine Image')), ('ari', _('ARI - Amazon Ramdisk Image')), ('docker', _('Docker')), ('iso', _('ISO - Optical Disk Image')), ('ova', _('OVA - Open Virtual Appliance')), ('ploop', _('PLOOP - Virtuozzo/Parallels Loopback Disk')), ('qcow2', _('QCOW2 - QEMU Emulator')), ('raw', _('Raw')), ('vdi', _('VDI - Virtual Disk Image')), ('vhd', _('VHD - Virtual Hard Disk')), ('vhdx', _('VHDX - Large Virtual Hard Disk')), ('vmdk', _('VMDK - Virtual Machine Disk')), ] } MIDDLEWARE = ( 'openstack_auth.middleware.OpenstackAuthMonkeyPatchMiddleware', 'debreach.middleware.RandomCommentMiddleware', 'django.middleware.common.CommonMiddleware', 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'horizon.middleware.OperationLogMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'horizon.middleware.HorizonMiddleware', 'horizon.themes.ThemeMiddleware', 'django.middleware.locale.LocaleMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', 'openstack_dashboard.contrib.developer.profiler.middleware.' 'ProfilerClientMiddleware', 'openstack_dashboard.contrib.developer.profiler.middleware.' 'ProfilerMiddleware', ) CACHED_TEMPLATE_LOADERS = [ 'django.template.loaders.filesystem.Loader', 'django.template.loaders.app_directories.Loader', 'horizon.loaders.TemplateLoader' ] ADD_TEMPLATE_LOADERS = [] ADD_TEMPLATE_DIRS = [] TEMPLATES = [ { 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [os.path.join(ROOT_PATH, 'templates')], 'OPTIONS': { 'context_processors': [ 'django.template.context_processors.debug', 'django.template.context_processors.i18n', 'django.template.context_processors.request', 'django.template.context_processors.media', 'django.template.context_processors.static', 'django.contrib.messages.context_processors.messages', 'horizon.context_processors.horizon', 'openstack_dashboard.context_processors.openstack', ], 'loaders': [ 'horizon.themes.ThemeTemplateLoader' ], }, }, ] STATICFILES_FINDERS = ( 'django.contrib.staticfiles.finders.FileSystemFinder', 'horizon.contrib.staticfiles.finders.HorizonStaticFinder', 'compressor.finders.CompressorFinder', ) COMPRESS_PRECOMPILERS = ( ('text/scss', 'horizon.utils.scss_filter.HorizonScssFilter'), ) COMPRESS_CSS_FILTERS = ( 'compressor.filters.css_default.CssAbsoluteFilter', ) COMPRESS_ENABLED = True COMPRESS_OUTPUT_DIR = 'dashboard' COMPRESS_CSS_HASHING_METHOD = 'hash' COMPRESS_PARSER = 'compressor.parser.HtmlParser' INSTALLED_APPS = [ 'openstack_dashboard', 'django.contrib.contenttypes', 'django.contrib.auth', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'django.contrib.humanize', 'django_pyscss', 'debreach', 'openstack_dashboard.django_pyscss_fix', 'compressor', 'horizon', 'openstack_auth', ] AUTHENTICATION_BACKENDS = ('openstack_auth.backend.KeystoneBackend',) AUTHENTICATION_URLS = ['openstack_auth.urls'] AUTH_USER_MODEL = 'openstack_auth.User' MESSAGE_STORAGE = 'django.contrib.messages.storage.fallback.FallbackStorage' SESSION_ENGINE = 'django.contrib.sessions.backends.cache' CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache', 'LOCATION': '127.0.0.1:11211', }, } SESSION_COOKIE_HTTPONLY = True SESSION_EXPIRE_AT_BROWSER_CLOSE = True SESSION_COOKIE_SECURE = False # Control whether the SESSION_TIMEOUT period is refreshed due to activity. If # False, SESSION_TIMEOUT acts as a hard limit. SESSION_REFRESH = True # This SESSION_TIMEOUT is a method to supercede the token timeout with a # shorter horizon session timeout (in seconds). If SESSION_REFRESH is True (the # default) SESSION_TIMEOUT acts like an idle timeout rather than being a hard # limit, but will never exceed the token expiry. If your token expires in 60 # minutes, a value of 1800 will log users out after 30 minutes of inactivity, # or 60 minutes with activity. Setting SESSION_REFRESH to False will make # SESSION_TIMEOUT act like a hard limit on session times. SESSION_TIMEOUT = 3600 # When using cookie-based sessions, log error when the session cookie exceeds # the following size (common browsers drop cookies above a certain size): SESSION_COOKIE_MAX_SIZE = 4093 # when doing upgrades, it may be wise to stick to PickleSerializer # NOTE(berendt): Check during the K-cycle if this variable can be removed. # https://bugs.launchpad.net/horizon/+bug/1349463 SESSION_SERIALIZER = 'django.contrib.sessions.serializers.PickleSerializer' # MEMOIZED_MAX_SIZE_DEFAULT allows setting a global default to help control # memory usage when caching. It should at least be 2 x the number of threads # with a little bit of extra buffer. MEMOIZED_MAX_SIZE_DEFAULT = 25 CSRF_FAILURE_VIEW = 'openstack_dashboard.views.csrf_failure' LANGUAGES = ( ('cs', 'Czech'), ('de', 'German'), ('en', 'English'), ('en-au', 'Australian English'), ('en-gb', 'British English'), ('eo', 'Esperanto'), ('es', 'Spanish'), ('fr', 'French'), ('id', 'Indonesian'), ('it', 'Italian'), ('ja', 'Japanese'), ('ko', 'Korean (Korea)'), ('pl', 'Polish'), ('pt-br', 'Portuguese (Brazil)'), ('ru', 'Russian'), ('tr', 'Turkish'), ('zh-cn', 'Simplified Chinese'), ('zh-tw', 'Chinese (Taiwan)'), ) LANGUAGE_CODE = 'en' LANGUAGE_COOKIE_NAME = 'horizon_language' USE_I18N = True USE_L10N = True USE_TZ = True # Set OPENSTACK_CLOUDS_YAML_NAME to provide a nicer name for this cloud for # the clouds.yaml file than "openstack". OPENSTACK_CLOUDS_YAML_NAME = 'openstack' # If this cloud has a vendor profile in os-client-config, put it's name here. OPENSTACK_CLOUDS_YAML_PROFILE = '' OPENSTACK_KEYSTONE_DEFAULT_ROLE = '_member_' DEFAULT_EXCEPTION_REPORTER_FILTER = 'horizon.exceptions.HorizonReporterFilter' POLICY_FILES_PATH = os.path.join(ROOT_PATH, "conf") # Map of local copy of service policy files POLICY_FILES = { 'identity': 'keystone_policy.json', 'compute': 'nova_policy.json', 'volume': 'cinder_policy.json', 'image': 'glance_policy.json', 'network': 'neutron_policy.json', } # Services for which horizon has extra policies are defined # in POLICY_DIRS by default. POLICY_DIRS = { 'compute': ['nova_policy.d'], 'volume': ['cinder_policy.d'], } SECRET_KEY = None LOCAL_PATH = None SECURITY_GROUP_RULES = { 'all_tcp': { 'name': _('All TCP'), 'ip_protocol': 'tcp', 'from_port': '1', 'to_port': '65535', }, 'all_udp': { 'name': _('All UDP'), 'ip_protocol': 'udp', 'from_port': '1', 'to_port': '65535', }, 'all_icmp': { 'name': _('All ICMP'), 'ip_protocol': 'icmp', 'from_port': '-1', 'to_port': '-1', }, } ADD_INSTALLED_APPS = [] # NOTE: The default value of USER_MENU_LINKS will be set after loading # local_settings if it is not configured. USER_MENU_LINKS = None # 'key', 'label', 'path' AVAILABLE_THEMES = [ ( 'default', pgettext_lazy('Default style theme', 'Default'), 'themes/default' ), ( 'material', pgettext_lazy("Google's Material Design style theme", "Material"), 'themes/material' ), ] # The default theme if no cookie is present DEFAULT_THEME = 'default' # Theme Static Directory THEME_COLLECTION_DIR = 'themes' # Theme Cookie Name THEME_COOKIE_NAME = 'theme' POLICY_CHECK_FUNCTION = 'openstack_auth.policy.check' CSRF_COOKIE_AGE = None COMPRESS_OFFLINE_CONTEXT = 'horizon.themes.offline_context' SHOW_KEYSTONE_V2_RC = False SHOW_OPENRC_FILE = True SHOW_OPENSTACK_CLOUDS_YAML = True # Dictionary of currently available angular features ANGULAR_FEATURES = { 'images_panel': True, 'key_pairs_panel': True, 'flavors_panel': False, 'domains_panel': False, 'users_panel': False, 'groups_panel': False, 'roles_panel': True } # Notice all customizable configurations should be above this line XSTATIC_MODULES = settings_utils.BASE_XSTATIC_MODULES OPENSTACK_PROFILER = { 'enabled': False } if not LOCAL_PATH: LOCAL_PATH = os.path.join(ROOT_PATH, 'local') LOCAL_SETTINGS_DIR_PATH = os.path.join(LOCAL_PATH, "local_settings.d") _files = glob.glob(os.path.join(LOCAL_PATH, 'local_settings.conf')) _files.extend( sorted(glob.glob(os.path.join(LOCAL_SETTINGS_DIR_PATH, '*.conf')))) _config = config.load_config(_files, ROOT_PATH, LOCAL_PATH) # Apply the general configuration. config.apply_config(_config, globals()) try: from local.local_settings import * # noqa: F403,H303 except ImportError: _LOG.warning("No local_settings file found.") # configure templates if not TEMPLATES[0]['DIRS']: TEMPLATES[0]['DIRS'] = [os.path.join(ROOT_PATH, 'templates')] TEMPLATES[0]['DIRS'] += ADD_TEMPLATE_DIRS # configure template debugging TEMPLATES[0]['OPTIONS']['debug'] = DEBUG # Template loaders if DEBUG: TEMPLATES[0]['OPTIONS']['loaders'].extend( CACHED_TEMPLATE_LOADERS + ADD_TEMPLATE_LOADERS ) else: TEMPLATES[0]['OPTIONS']['loaders'].extend( [('django.template.loaders.cached.Loader', CACHED_TEMPLATE_LOADERS)] + ADD_TEMPLATE_LOADERS ) # allow to drop settings snippets into a local_settings_dir LOCAL_SETTINGS_DIR_PATH = os.path.join(ROOT_PATH, "local", "local_settings.d") if os.path.exists(LOCAL_SETTINGS_DIR_PATH): for (dirpath, dirnames, filenames) in os.walk(LOCAL_SETTINGS_DIR_PATH): for filename in sorted(filenames): if filename.endswith(".py"): try: with open(os.path.join(dirpath, filename)) as f: # pylint: disable=exec-used exec(f.read()) except Exception as e: _LOG.exception( "Can not exec settings snippet %s", filename) # The purpose of OPENSTACK_IMAGE_FORMATS is to provide a simple object # that does not contain the lazy-loaded translations, so the list can # be sent as JSON to the client-side (Angular). OPENSTACK_IMAGE_FORMATS = [fmt for (fmt, name) in OPENSTACK_IMAGE_BACKEND['image_formats']] if USER_MENU_LINKS is None: USER_MENU_LINKS = [] if SHOW_KEYSTONE_V2_RC: USER_MENU_LINKS.append({ 'name': _('OpenStack RC File v2'), 'icon_classes': ['fa-download', ], 'url': 'horizon:project:api_access:openrcv2', }) if SHOW_OPENRC_FILE: USER_MENU_LINKS.append({ 'name': (_('OpenStack RC File v3') if SHOW_KEYSTONE_V2_RC else _('OpenStack RC File')), 'icon_classes': ['fa-download', ], 'url': 'horizon:project:api_access:openrc', }) if not WEBROOT.endswith('/'): WEBROOT += '/' if LOGIN_URL is None: LOGIN_URL = WEBROOT + 'auth/login/' if LOGOUT_URL is None: LOGOUT_URL = WEBROOT + 'auth/logout/' if LOGIN_ERROR is None: LOGIN_ERROR = WEBROOT + 'auth/error/' if LOGIN_REDIRECT_URL is None: LOGIN_REDIRECT_URL = WEBROOT if MEDIA_ROOT is None: MEDIA_ROOT = os.path.abspath(os.path.join(ROOT_PATH, '..', 'media')) if MEDIA_URL is None: MEDIA_URL = WEBROOT + 'media/' if STATIC_ROOT is None: STATIC_ROOT = os.path.abspath(os.path.join(ROOT_PATH, '..', 'static')) if STATIC_URL is None: STATIC_URL = WEBROOT + 'static/' AVAILABLE_THEMES, SELECTABLE_THEMES, DEFAULT_THEME = ( theme_settings.get_available_themes( AVAILABLE_THEMES, DEFAULT_THEME, SELECTABLE_THEMES ) ) # Discover all the directories that contain static files STATICFILES_DIRS = theme_settings.get_theme_static_dirs( AVAILABLE_THEMES, THEME_COLLECTION_DIR, ROOT_PATH) # Ensure that we always have a SECRET_KEY set, even when no local_settings.py # file is present. See local_settings.py.example for full documentation on the # horizon.utils.secret_key module and its use. if not SECRET_KEY: if not LOCAL_PATH: LOCAL_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'local') # pylint: disable=ungrouped-imports from horizon.utils import secret_key SECRET_KEY = secret_key.generate_or_read_from_file(os.path.join(LOCAL_PATH, '.secret_key_store')) # populate HORIZON_CONFIG with auto-discovered JavaScript sources, mock files, # specs files and external templates. settings_utils.find_static_files(HORIZON_CONFIG, AVAILABLE_THEMES, THEME_COLLECTION_DIR, ROOT_PATH) INSTALLED_APPS = list(INSTALLED_APPS) # Make sure it's mutable settings_utils.update_dashboards( [ enabled, local_enabled, ], HORIZON_CONFIG, INSTALLED_APPS, ) INSTALLED_APPS[0:0] = ADD_INSTALLED_APPS NG_TEMPLATE_CACHE_AGE = NG_TEMPLATE_CACHE_AGE if not DEBUG else 0 # Include xstatic_modules specified in plugin XSTATIC_MODULES += HORIZON_CONFIG['xstatic_modules'] # Discover all the xstatic module entry points to embed in our HTML STATICFILES_DIRS += settings_utils.get_xstatic_dirs( XSTATIC_MODULES, HORIZON_CONFIG) # This base context objects gets added to the offline context generator # for each theme configured. HORIZON_COMPRESS_OFFLINE_CONTEXT_BASE = { 'WEBROOT': WEBROOT, 'STATIC_URL': STATIC_URL, 'HORIZON_CONFIG': HORIZON_CONFIG, 'NG_TEMPLATE_CACHE_AGE': NG_TEMPLATE_CACHE_AGE, } if DEBUG: logging.basicConfig(level=logging.DEBUG) # Here comes the Django settings deprecation section. Being at the very end # of settings.py allows it to catch the settings defined in local_settings.py # or inside one of local_settings.d/ snippets.
Heiffeng
A high-performance, thread-safe, time-expiring Map<K,V> for Java 17 Built on ConcurrentHashMap + DelayQueue with a background cleaner, lazy cleanup on reads, version-guarded eviction, and Duration-based TTL APIs.
L422Y
Extended JavaScript Map with caching, loading, expiry features... and more
ackerleytng
A custom protocol mapper for keycloak that allows extension of expiry time (exp) in Access tokens and ID tokens
SiteQ8
TLS Cert Expiry Radar v2.0 — Monitor SSL/TLS certificate expiry with interactive radar, doughnut, bar charts, domain scanner, dark mode, global map, timeline search/filter | Zero-install browser app | 12 demo certs
parth494
Oingo is a platform that allows users to share information based on temporal, social and geographical constraints. It basically allows users to create a note, tag it to a location, have an expiry and make it visible to a set of people. For a better visualization, It has two different views for homepage – Map view and List view
#!/usr/bin/env python # Copyright 2018 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import datetime import json import os import shutil import tempfile import unittest import mock import yaml from six import PY3, next from kubernetes.client import Configuration from .config_exception import ConfigException from .kube_config import (ENV_KUBECONFIG_PATH_SEPARATOR, ConfigNode, FileOrData, KubeConfigLoader, KubeConfigMerger, _cleanup_temp_files, _create_temp_file_with_content, list_kube_config_contexts, load_kube_config, new_client_from_config) BEARER_TOKEN_FORMAT = "Bearer %s" EXPIRY_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" # should be less than kube_config.EXPIRY_SKEW_PREVENTION_DELAY PAST_EXPIRY_TIMEDELTA = 2 # should be more than kube_config.EXPIRY_SKEW_PREVENTION_DELAY FUTURE_EXPIRY_TIMEDELTA = 60 NON_EXISTING_FILE = "zz_non_existing_file_472398324" def _base64(string): return base64.standard_b64encode(string.encode()).decode() def _urlsafe_unpadded_b64encode(string): return base64.urlsafe_b64encode(string.encode()).decode().rstrip("=") def _format_expiry_datetime(dt): return dt.strftime(EXPIRY_DATETIME_FORMAT) def _get_expiry(loader, active_context): expired_gcp_conf = ( item for item in loader._config.value.get("users") if item.get("name") == active_context) return next(expired_gcp_conf).get("user").get("auth-provider") \ .get("config").get("expiry") def _raise_exception(st): raise Exception(st) TEST_FILE_KEY = "file" TEST_DATA_KEY = "data" TEST_FILENAME = "test-filename" TEST_DATA = "test-data" TEST_DATA_BASE64 = _base64(TEST_DATA) TEST_ANOTHER_DATA = "another-test-data" TEST_ANOTHER_DATA_BASE64 = _base64(TEST_ANOTHER_DATA) TEST_HOST = "test-host" TEST_USERNAME = "me" TEST_PASSWORD = "pass" # token for me:pass TEST_BASIC_TOKEN = "Basic bWU6cGFzcw==" DATETIME_EXPIRY_PAST = datetime.datetime.utcnow() - datetime.timedelta( minutes=PAST_EXPIRY_TIMEDELTA) DATETIME_EXPIRY_FUTURE = datetime.datetime.utcnow() + datetime.timedelta( minutes=FUTURE_EXPIRY_TIMEDELTA) TEST_TOKEN_EXPIRY_PAST = _format_expiry_datetime(DATETIME_EXPIRY_PAST) TEST_SSL_HOST = "https://test-host" TEST_CERTIFICATE_AUTH = "cert-auth" TEST_CERTIFICATE_AUTH_BASE64 = _base64(TEST_CERTIFICATE_AUTH) TEST_CLIENT_KEY = "client-key" TEST_CLIENT_KEY_BASE64 = _base64(TEST_CLIENT_KEY) TEST_CLIENT_CERT = "client-cert" TEST_CLIENT_CERT_BASE64 = _base64(TEST_CLIENT_CERT) TEST_OIDC_TOKEN = "test-oidc-token" TEST_OIDC_INFO = "{\"name\": \"test\"}" TEST_OIDC_BASE = ".".join([ _urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN), _urlsafe_unpadded_b64encode(TEST_OIDC_INFO) ]) TEST_OIDC_LOGIN = ".".join( [TEST_OIDC_BASE, _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT_BASE64)]) TEST_OIDC_TOKEN = "Bearer %s" % TEST_OIDC_LOGIN TEST_OIDC_EXP = "{\"name\": \"test\",\"exp\": 536457600}" TEST_OIDC_EXP_BASE = _urlsafe_unpadded_b64encode( TEST_OIDC_TOKEN) + "." + _urlsafe_unpadded_b64encode(TEST_OIDC_EXP) TEST_OIDC_EXPIRED_LOGIN = ".".join( [TEST_OIDC_EXP_BASE, _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT)]) TEST_OIDC_CONTAINS_RESERVED_CHARACTERS = ".".join([ _urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN), _urlsafe_unpadded_b64encode(TEST_OIDC_INFO).replace("a", "+"), _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT) ]) TEST_OIDC_INVALID_PADDING_LENGTH = ".".join([ _urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN), "aaaaa", _urlsafe_unpadded_b64encode(TEST_CLIENT_CERT) ]) TEST_OIDC_CA = _base64(TEST_CERTIFICATE_AUTH) class BaseTestCase(unittest.TestCase): def setUp(self): self._temp_files = [] def tearDown(self): for f in self._temp_files: os.remove(f) def _create_temp_file(self, content=""): handler, name = tempfile.mkstemp() self._temp_files.append(name) os.write(handler, str.encode(content)) os.close(handler) return name def expect_exception(self, func, message_part, *args, **kwargs): with self.assertRaises(ConfigException) as context: func(*args, **kwargs) self.assertIn(message_part, str(context.exception)) class TestFileOrData(BaseTestCase): @staticmethod def get_file_content(filename): with open(filename) as f: return f.read() def test_file_given_file(self): temp_filename = _create_temp_file_with_content(TEST_DATA) obj = {TEST_FILE_KEY: temp_filename} t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) def test_file_given_non_existing_file(self): temp_filename = NON_EXISTING_FILE obj = {TEST_FILE_KEY: temp_filename} t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY) self.expect_exception(t.as_file, "does not exists") def test_file_given_data(self): obj = {TEST_DATA_KEY: TEST_DATA_BASE64} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) def test_file_given_data_no_base64(self): obj = {TEST_DATA_KEY: TEST_DATA} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY, base64_file_content=False) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) def test_data_given_data(self): obj = {TEST_DATA_KEY: TEST_DATA_BASE64} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY) self.assertEqual(TEST_DATA_BASE64, t.as_data()) def test_data_given_file(self): obj = {TEST_FILE_KEY: self._create_temp_file(content=TEST_DATA)} t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY) self.assertEqual(TEST_DATA_BASE64, t.as_data()) def test_data_given_file_no_base64(self): obj = {TEST_FILE_KEY: self._create_temp_file(content=TEST_DATA)} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, base64_file_content=False) self.assertEqual(TEST_DATA, t.as_data()) def test_data_given_file_and_data(self): obj = { TEST_DATA_KEY: TEST_DATA_BASE64, TEST_FILE_KEY: self._create_temp_file(content=TEST_ANOTHER_DATA) } t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY) self.assertEqual(TEST_DATA_BASE64, t.as_data()) def test_file_given_file_and_data(self): obj = { TEST_DATA_KEY: TEST_DATA_BASE64, TEST_FILE_KEY: self._create_temp_file(content=TEST_ANOTHER_DATA) } t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) def test_file_with_custom_dirname(self): tempfile = self._create_temp_file(content=TEST_DATA) tempfile_dir = os.path.dirname(tempfile) tempfile_basename = os.path.basename(tempfile) obj = {TEST_FILE_KEY: tempfile_basename} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, file_base_path=tempfile_dir) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) def test_create_temp_file_with_content(self): self.assertEqual( TEST_DATA, self.get_file_content(_create_temp_file_with_content(TEST_DATA))) _cleanup_temp_files() def test_file_given_data_bytes(self): obj = {TEST_DATA_KEY: TEST_DATA_BASE64.encode()} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) def test_file_given_data_bytes_no_base64(self): obj = {TEST_DATA_KEY: TEST_DATA.encode()} t = FileOrData( obj=obj, file_key_name=TEST_FILE_KEY, data_key_name=TEST_DATA_KEY, base64_file_content=False) self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) class TestConfigNode(BaseTestCase): test_obj = { "key1": "test", "key2": ["a", "b", "c"], "key3": { "inner_key": "inner_value" }, "with_names": [{ "name": "test_name", "value": "test_value" }, { "name": "test_name2", "value": {"key1", "test"} }, { "name": "test_name3", "value": [1, 2, 3] }], "with_names_dup": [{ "name": "test_name", "value": "test_value" }, { "name": "test_name", "value": {"key1", "test"} }, { "name": "test_name3", "value": [1, 2, 3] }] } def setUp(self): super(TestConfigNode, self).setUp() self.node = ConfigNode("test_obj", self.test_obj) def test_normal_map_array_operations(self): self.assertEqual("test", self.node["key1"]) self.assertEqual(5, len(self.node)) self.assertEqual("test_obj/key2", self.node["key2"].name) self.assertEqual(["a", "b", "c"], self.node["key2"].value) self.assertEqual("b", self.node["key2"][1]) self.assertEqual(3, len(self.node["key2"])) self.assertEqual("test_obj/key3", self.node["key3"].name) self.assertEqual({"inner_key": "inner_value"}, self.node["key3"].value) self.assertEqual("inner_value", self.node["key3"]["inner_key"]) self.assertEqual(1, len(self.node["key3"])) def test_get_with_name(self): node = self.node["with_names"] self.assertEqual("test_value", node.get_with_name("test_name")["value"]) self.assertTrue(isinstance(node.get_with_name("test_name2"), ConfigNode)) self.assertTrue(isinstance(node.get_with_name("test_name3"), ConfigNode)) self.assertEqual("test_obj/with_names[name=test_name2]", node.get_with_name("test_name2").name) self.assertEqual("test_obj/with_names[name=test_name3]", node.get_with_name("test_name3").name) def test_key_does_not_exists(self): self.expect_exception(lambda: self.node["not-exists-key"], "Expected key not-exists-key in test_obj") self.expect_exception(lambda: self.node["key3"]["not-exists-key"], "Expected key not-exists-key in test_obj/key3") def test_get_with_name_on_invalid_object(self): self.expect_exception( lambda: self.node["key2"].get_with_name("no-name"), "Expected all values in test_obj/key2 list to have \'name\' key") def test_get_with_name_on_non_list_object(self): self.expect_exception(lambda: self.node["key3"].get_with_name("no-name"), "Expected test_obj/key3 to be a list") def test_get_with_name_on_name_does_not_exists(self): self.expect_exception( lambda: self.node["with_names"].get_with_name("no-name"), "Expected object with name no-name in test_obj/with_names list") def test_get_with_name_on_duplicate_name(self): self.expect_exception( lambda: self.node["with_names_dup"].get_with_name("test_name"), "Expected only one object with name test_name in " "test_obj/with_names_dup list") class FakeConfig: FILE_KEYS = ["ssl_ca_cert", "key_file", "cert_file"] def __init__(self, token=None, **kwargs): self.api_key = {} if token: self.api_key["authorization"] = token self.__dict__.update(kwargs) def __eq__(self, other): if len(self.__dict__) != len(other.__dict__): return for k, v in self.__dict__.items(): if k not in other.__dict__: return if k in self.FILE_KEYS: if v and other.__dict__[k]: try: with open(v) as f1, open(other.__dict__[k]) as f2: if f1.read() != f2.read(): return except IOError: # fall back to only compare filenames in case we are # testing the passing of filenames to the config if other.__dict__[k] != v: return else: if other.__dict__[k] != v: return else: if other.__dict__[k] != v: return return True def __repr__(self): rep = "\n" for k, v in self.__dict__.items(): val = v if k in self.FILE_KEYS: try: with open(v) as f: val = "FILE: %s" % str.decode(f.read()) except IOError as e: val = "ERROR: %s" % str(e) rep += "\t%s: %s\n" % (k, val) return "Config(%s\n)" % rep class TestKubeConfigLoader(BaseTestCase): TEST_KUBE_CONFIG = { "current-context": "no_user", "contexts": [ { "name": "no_user", "context": { "cluster": "default" } }, { "name": "simple_token", "context": { "cluster": "default", "user": "simple_token" } }, { "name": "gcp", "context": { "cluster": "default", "user": "gcp" } }, { "name": "expired_gcp", "context": { "cluster": "default", "user": "expired_gcp" } }, { "name": "expired_gcp_refresh", "context": { "cluster": "default", "user": "expired_gcp_refresh" } }, { "name": "oidc", "context": { "cluster": "default", "user": "oidc" } }, { "name": "expired_oidc", "context": { "cluster": "default", "user": "expired_oidc" } }, { "name": "expired_oidc_nocert", "context": { "cluster": "default", "user": "expired_oidc_nocert" } }, { "name": "oidc_contains_reserved_character", "context": { "cluster": "default", "user": "oidc_contains_reserved_character" } }, { "name": "oidc_invalid_padding_length", "context": { "cluster": "default", "user": "oidc_invalid_padding_length" } }, { "name": "user_pass", "context": { "cluster": "default", "user": "user_pass" } }, { "name": "ssl", "context": { "cluster": "ssl", "user": "ssl" } }, { "name": "no_ssl_verification", "context": { "cluster": "no_ssl_verification", "user": "ssl" } }, { "name": "ssl-no_file", "context": { "cluster": "ssl-no_file", "user": "ssl-no_file" } }, { "name": "ssl-local-file", "context": { "cluster": "ssl-local-file", "user": "ssl-local-file" } }, { "name": "non_existing_user", "context": { "cluster": "default", "user": "non_existing_user" } }, { "name": "exec_cred_user", "context": { "cluster": "default", "user": "exec_cred_user" } }, ], "clusters": [ { "name": "default", "cluster": { "server": TEST_HOST } }, { "name": "ssl-no_file", "cluster": { "server": TEST_SSL_HOST, "certificate-authority": TEST_CERTIFICATE_AUTH, } }, { "name": "ssl-local-file", "cluster": { "server": TEST_SSL_HOST, "certificate-authority": "cert_test", } }, { "name": "ssl", "cluster": { "server": TEST_SSL_HOST, "certificate-authority-data": TEST_CERTIFICATE_AUTH_BASE64, } }, { "name": "no_ssl_verification", "cluster": { "server": TEST_SSL_HOST, "insecure-skip-tls-verify": "true", } }, ], "users": [ { "name": "simple_token", "user": { "token": TEST_DATA_BASE64, "username": TEST_USERNAME, # should be ignored "password": TEST_PASSWORD, # should be ignored } }, { "name": "gcp", "user": { "auth-provider": { "name": "gcp", "config": { "access-token": TEST_DATA_BASE64, } }, "token": TEST_DATA_BASE64, # should be ignored "username": TEST_USERNAME, # should be ignored "password": TEST_PASSWORD, # should be ignored } }, { "name": "expired_gcp", "user": { "auth-provider": { "name": "gcp", "config": { "access-token": TEST_DATA_BASE64, "expiry": TEST_TOKEN_EXPIRY_PAST, # always in past } }, "token": TEST_DATA_BASE64, # should be ignored "username": TEST_USERNAME, # should be ignored "password": TEST_PASSWORD, # should be ignored } }, # Duplicated from "expired_gcp" so test_load_gcp_token_with_refresh # is isolated from test_gcp_get_api_key_with_prefix. { "name": "expired_gcp_refresh", "user": { "auth-provider": { "name": "gcp", "config": { "access-token": TEST_DATA_BASE64, "expiry": TEST_TOKEN_EXPIRY_PAST, # always in past } }, "token": TEST_DATA_BASE64, # should be ignored "username": TEST_USERNAME, # should be ignored "password": TEST_PASSWORD, # should be ignored } }, { "name": "oidc", "user": { "auth-provider": { "name": "oidc", "config": { "id-token": TEST_OIDC_LOGIN } } } }, { "name": "expired_oidc", "user": { "auth-provider": { "name": "oidc", "config": { "client-id": "tectonic-kubectl", "client-secret": "FAKE_SECRET", "id-token": TEST_OIDC_EXPIRED_LOGIN, "idp-certificate-authority-data": TEST_OIDC_CA, "idp-issuer-url": "https://example.org/identity", "refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" } } } }, { "name": "expired_oidc_nocert", "user": { "auth-provider": { "name": "oidc", "config": { "client-id": "tectonic-kubectl", "client-secret": "FAKE_SECRET", "id-token": TEST_OIDC_EXPIRED_LOGIN, "idp-issuer-url": "https://example.org/identity", "refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" } } } }, { "name": "oidc_contains_reserved_character", "user": { "auth-provider": { "name": "oidc", "config": { "client-id": "tectonic-kubectl", "client-secret": "FAKE_SECRET", "id-token": TEST_OIDC_CONTAINS_RESERVED_CHARACTERS, "idp-issuer-url": "https://example.org/identity", "refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" } } } }, { "name": "oidc_invalid_padding_length", "user": { "auth-provider": { "name": "oidc", "config": { "client-id": "tectonic-kubectl", "client-secret": "FAKE_SECRET", "id-token": TEST_OIDC_INVALID_PADDING_LENGTH, "idp-issuer-url": "https://example.org/identity", "refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" } } } }, { "name": "user_pass", "user": { "username": TEST_USERNAME, # should be ignored "password": TEST_PASSWORD, # should be ignored } }, { "name": "ssl-no_file", "user": { "token": TEST_DATA_BASE64, "client-certificate": TEST_CLIENT_CERT, "client-key": TEST_CLIENT_KEY, } }, { "name": "ssl-local-file", "user": { "tokenFile": "token_file", "client-certificate": "client_cert", "client-key": "client_key", } }, { "name": "ssl", "user": { "token": TEST_DATA_BASE64, "client-certificate-data": TEST_CLIENT_CERT_BASE64, "client-key-data": TEST_CLIENT_KEY_BASE64, } }, { "name": "exec_cred_user", "user": { "exec": { "apiVersion": "client.authentication.k8s.io/v1beta1", "command": "aws-iam-authenticator", "args": ["token", "-i", "dummy-cluster"] } } }, ] } def test_no_user_context(self): expected = FakeConfig(host=TEST_HOST) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="no_user").load_and_set(actual) self.assertEqual(expected, actual) def test_simple_token(self): expected = FakeConfig( host=TEST_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="simple_token").load_and_set(actual) self.assertEqual(expected, actual) def test_load_user_token(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="simple_token") self.assertTrue(loader._load_user_token()) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, loader.token) def test_gcp_no_refresh(self): fake_config = FakeConfig() # swagger-generated config has this, but FakeConfig does not. self.assertFalse(hasattr(fake_config, "get_api_key_with_prefix")) KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="gcp", get_google_credentials=lambda: _raise_exception("SHOULD NOT BE CALLED") ).load_and_set(fake_config) # Should now be populated with a gcp token fetcher. self.assertIsNotNone(fake_config.get_api_key_with_prefix) self.assertEqual(TEST_HOST, fake_config.host) # For backwards compatibility, authorization field should still be set. self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, fake_config.api_key["authorization"]) def test_load_gcp_token_no_refresh(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="gcp", get_google_credentials=lambda: _raise_exception("SHOULD NOT BE CALLED")) self.assertTrue(loader._load_auth_provider_token()) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, loader.token) def test_load_gcp_token_with_refresh(self): def cred(): return None cred.token = TEST_ANOTHER_DATA_BASE64 cred.expiry = datetime.datetime.utcnow() loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="expired_gcp", get_google_credentials=lambda: cred) original_expiry = _get_expiry(loader, "expired_gcp") self.assertTrue(loader._load_auth_provider_token()) new_expiry = _get_expiry(loader, "expired_gcp") # assert that the configs expiry actually updates self.assertTrue(new_expiry > original_expiry) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, loader.token) def test_gcp_get_api_key_with_prefix(self): class cred_old: token = TEST_DATA_BASE64 expiry = DATETIME_EXPIRY_PAST class cred_new: token = TEST_ANOTHER_DATA_BASE64 expiry = DATETIME_EXPIRY_FUTURE fake_config = FakeConfig() _get_google_credentials = mock.Mock() _get_google_credentials.side_effect = [cred_old, cred_new] loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="expired_gcp_refresh", get_google_credentials=_get_google_credentials) loader.load_and_set(fake_config) original_expiry = _get_expiry(loader, "expired_gcp_refresh") # Call GCP token fetcher. token = fake_config.get_api_key_with_prefix() new_expiry = _get_expiry(loader, "expired_gcp_refresh") self.assertTrue(new_expiry > original_expiry) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, loader.token) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, token) def test_oidc_no_refresh(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="oidc", ) self.assertTrue(loader._load_auth_provider_token()) self.assertEqual(TEST_OIDC_TOKEN, loader.token) @mock.patch("kubernetes.config.kube_config.OAuth2Session.refresh_token") @mock.patch("kubernetes.config.kube_config.ApiClient.request") def test_oidc_with_refresh(self, mock_ApiClient, mock_OAuth2Session): mock_response = mock.MagicMock() type(mock_response).status = mock.PropertyMock(return_value=200) type(mock_response).data = mock.PropertyMock( return_value=json.dumps( {"token_endpoint": "https://example.org/identity/token"})) mock_ApiClient.return_value = mock_response mock_OAuth2Session.return_value = { "id_token": "abc123", "refresh_token": "newtoken123" } loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="expired_oidc", ) self.assertTrue(loader._load_auth_provider_token()) self.assertEqual("Bearer abc123", loader.token) @mock.patch("kubernetes.config.kube_config.OAuth2Session.refresh_token") @mock.patch("kubernetes.config.kube_config.ApiClient.request") def test_oidc_with_refresh_nocert(self, mock_ApiClient, mock_OAuth2Session): mock_response = mock.MagicMock() type(mock_response).status = mock.PropertyMock(return_value=200) type(mock_response).data = mock.PropertyMock( return_value=json.dumps( {"token_endpoint": "https://example.org/identity/token"})) mock_ApiClient.return_value = mock_response mock_OAuth2Session.return_value = { "id_token": "abc123", "refresh_token": "newtoken123" } loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="expired_oidc_nocert", ) self.assertTrue(loader._load_auth_provider_token()) self.assertEqual("Bearer abc123", loader.token) def test_oidc_fails_if_contains_reserved_chars(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="oidc_contains_reserved_character", ) self.assertEqual( loader._load_oid_token("oidc_contains_reserved_character"), None, ) def test_oidc_fails_if_invalid_padding_length(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="oidc_invalid_padding_length", ) self.assertEqual( loader._load_oid_token("oidc_invalid_padding_length"), None, ) def test_user_pass(self): expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="user_pass").load_and_set(actual) self.assertEqual(expected, actual) def test_load_user_pass_token(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="user_pass") self.assertTrue(loader._load_user_pass_token()) self.assertEqual(TEST_BASIC_TOKEN, loader.token) def test_ssl_no_cert_files(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="ssl-no_file") self.expect_exception(loader.load_and_set, "does not exists", FakeConfig()) def test_ssl(self): expected = FakeConfig( host=TEST_SSL_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, cert_file=self._create_temp_file(TEST_CLIENT_CERT), key_file=self._create_temp_file(TEST_CLIENT_KEY), ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH)) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="ssl").load_and_set(actual) self.assertEqual(expected, actual) def test_ssl_no_verification(self): expected = FakeConfig( host=TEST_SSL_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, cert_file=self._create_temp_file(TEST_CLIENT_CERT), key_file=self._create_temp_file(TEST_CLIENT_KEY), verify_ssl=False, ssl_ca_cert=None, ) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="no_ssl_verification").load_and_set(actual) self.assertEqual(expected, actual) def test_list_contexts(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="no_user") actual_contexts = loader.list_contexts() expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)["contexts"] for actual in actual_contexts: expected = expected_contexts.get_with_name(actual["name"]) self.assertEqual(expected.value, actual) def test_current_context(self): loader = KubeConfigLoader(config_dict=self.TEST_KUBE_CONFIG) expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)["contexts"] self.assertEqual( expected_contexts.get_with_name("no_user").value, loader.current_context) def test_set_active_context(self): loader = KubeConfigLoader(config_dict=self.TEST_KUBE_CONFIG) loader.set_active_context("ssl") expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)["contexts"] self.assertEqual( expected_contexts.get_with_name("ssl").value, loader.current_context) def test_ssl_with_relative_ssl_files(self): expected = FakeConfig( host=TEST_SSL_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, cert_file=self._create_temp_file(TEST_CLIENT_CERT), key_file=self._create_temp_file(TEST_CLIENT_KEY), ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH)) try: temp_dir = tempfile.mkdtemp() actual = FakeConfig() with open(os.path.join(temp_dir, "cert_test"), "wb") as fd: fd.write(TEST_CERTIFICATE_AUTH.encode()) with open(os.path.join(temp_dir, "client_cert"), "wb") as fd: fd.write(TEST_CLIENT_CERT.encode()) with open(os.path.join(temp_dir, "client_key"), "wb") as fd: fd.write(TEST_CLIENT_KEY.encode()) with open(os.path.join(temp_dir, "token_file"), "wb") as fd: fd.write(TEST_DATA_BASE64.encode()) KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="ssl-local-file", config_base_path=temp_dir).load_and_set(actual) self.assertEqual(expected, actual) finally: shutil.rmtree(temp_dir) def test_load_kube_config(self): expected = FakeConfig( host=TEST_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64) config_file = self._create_temp_file(yaml.safe_dump(self.TEST_KUBE_CONFIG)) actual = FakeConfig() load_kube_config( config_file=config_file, context="simple_token", client_configuration=actual) self.assertEqual(expected, actual) def test_list_kube_config_contexts(self): config_file = self._create_temp_file(yaml.safe_dump(self.TEST_KUBE_CONFIG)) contexts, active_context = list_kube_config_contexts( config_file=config_file) self.assertDictEqual(self.TEST_KUBE_CONFIG["contexts"][0], active_context) if PY3: self.assertCountEqual(self.TEST_KUBE_CONFIG["contexts"], contexts) else: self.assertItemsEqual(self.TEST_KUBE_CONFIG["contexts"], contexts) def test_new_client_from_config(self): config_file = self._create_temp_file(yaml.safe_dump(self.TEST_KUBE_CONFIG)) client = new_client_from_config( config_file=config_file, context="simple_token") self.assertEqual(TEST_HOST, client.configuration.host) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, client.configuration.api_key["authorization"]) def test_no_users_section(self): expected = FakeConfig(host=TEST_HOST) actual = FakeConfig() test_kube_config = self.TEST_KUBE_CONFIG.copy() del test_kube_config["users"] KubeConfigLoader( config_dict=test_kube_config, active_context="gcp").load_and_set(actual) self.assertEqual(expected, actual) def test_non_existing_user(self): expected = FakeConfig(host=TEST_HOST) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="non_existing_user").load_and_set(actual) self.assertEqual(expected, actual) @mock.patch("kubernetes.config.kube_config.ExecProvider.run") def test_user_exec_auth(self, mock): token = "dummy" mock.return_value = {"token": token} expected = FakeConfig( host=TEST_HOST, api_key={"authorization": BEARER_TOKEN_FORMAT % token}) actual = FakeConfig() KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="exec_cred_user").load_and_set(actual) self.assertEqual(expected, actual) class TestKubernetesClientConfiguration(BaseTestCase): # Verifies properties of kubernetes.client.Configuration. # These tests guard against changes to the upstream configuration class, # since GCP authorization overrides get_api_key_with_prefix to refresh its # token regularly. def test_get_api_key_with_prefix_exists(self): self.assertTrue(hasattr(Configuration, "get_api_key_with_prefix")) def test_get_api_key_with_prefix_returns_token(self): expected_token = "expected_token" config = Configuration() config.api_key["authorization"] = expected_token self.assertEqual(expected_token, config.get_api_key_with_prefix("authorization")) def test_auth_settings_calls_get_api_key_with_prefix(self): expected_token = "expected_token" def fake_get_api_key_with_prefix(identifier): self.assertEqual("authorization", identifier) return expected_token config = Configuration() config.get_api_key_with_prefix = fake_get_api_key_with_prefix self.assertEqual(expected_token, config.auth_settings()["BearerToken"]["value"]) class TestKubeConfigMerger(BaseTestCase): TEST_KUBE_CONFIG_PART1 = { "current-context": "no_user", "contexts": [{ "name": "no_user", "context": { "cluster": "default" } },], "clusters": [{ "name": "default", "cluster": { "server": TEST_HOST } },], "users": [] } TEST_KUBE_CONFIG_PART2 = { "current-context": "", "contexts": [ { "name": "ssl", "context": { "cluster": "ssl", "user": "ssl" } }, { "name": "simple_token", "context": { "cluster": "default", "user": "simple_token" } }, ], "clusters": [{ "name": "ssl", "cluster": { "server": TEST_SSL_HOST, "certificate-authority-data": TEST_CERTIFICATE_AUTH_BASE64, } },], "users": [{ "name": "ssl", "user": { "token": TEST_DATA_BASE64, "client-certificate-data": TEST_CLIENT_CERT_BASE64, "client-key-data": TEST_CLIENT_KEY_BASE64, } },] } TEST_KUBE_CONFIG_PART3 = { "current-context": "no_user", "contexts": [ { "name": "expired_oidc", "context": { "cluster": "default", "user": "expired_oidc" } }, { "name": "ssl", "context": { "cluster": "skipped-part2-defined-this-context", "user": "skipped" } }, ], "clusters": [], "users": [ { "name": "expired_oidc", "user": { "auth-provider": { "name": "oidc", "config": { "client-id": "tectonic-kubectl", "client-secret": "FAKE_SECRET", "id-token": TEST_OIDC_EXPIRED_LOGIN, "idp-certificate-authority-data": TEST_OIDC_CA, "idp-issuer-url": "https://example.org/identity", "refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" } } } }, { "name": "simple_token", "user": { "token": TEST_DATA_BASE64, "username": TEST_USERNAME, # should be ignored "password": TEST_PASSWORD, # should be ignored } }, ] } def _create_multi_config(self): files = [] for part in (self.TEST_KUBE_CONFIG_PART1, self.TEST_KUBE_CONFIG_PART2, self.TEST_KUBE_CONFIG_PART3): files.append(self._create_temp_file(yaml.safe_dump(part))) return ENV_KUBECONFIG_PATH_SEPARATOR.join(files) def test_list_kube_config_contexts(self): kubeconfigs = self._create_multi_config() expected_contexts = [{ "context": { "cluster": "default" }, "name": "no_user" }, { "context": { "cluster": "ssl", "user": "ssl" }, "name": "ssl" }, { "context": { "cluster": "default", "user": "simple_token" }, "name": "simple_token" }, { "context": { "cluster": "default", "user": "expired_oidc" }, "name": "expired_oidc" }] contexts, active_context = list_kube_config_contexts( config_file=kubeconfigs) self.assertEqual(contexts, expected_contexts) self.assertEqual(active_context, expected_contexts[0]) def test_new_client_from_config(self): kubeconfigs = self._create_multi_config() client = new_client_from_config( config_file=kubeconfigs, context="simple_token") self.assertEqual(TEST_HOST, client.configuration.host) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, client.configuration.api_key["authorization"]) def test_save_changes(self): kubeconfigs = self._create_multi_config() # load configuration, update token, save config kconf = KubeConfigMerger(kubeconfigs) user = kconf.config["users"].get_with_name("expired_oidc")["user"] provider = user["auth-provider"]["config"] provider.value["id-token"] = "token-changed" kconf.save_changes() # re-read configuration kconf = KubeConfigMerger(kubeconfigs) user = kconf.config["users"].get_with_name("expired_oidc")["user"] provider = user["auth-provider"]["config"] # new token self.assertEqual(provider.value["id-token"], "token-changed") if __name__ == "__main__": unittest.main()
cndousx
No description available
evanx
A JavaScript map with expiring members for caching.
juntaki
sync.Map with expiry period
hansrajchoudhary
No description available
anujiuee
No description available
united-manufacturing-hub
A thread-safe go map with item expiry
flrxnt
Map short codes to long URLs; add custom aliases, expiry, click analytics, and abuse prevention.
bharatkumarR
This project helps developers to consume additional features on top of java datastructures. Like event based list and time based expiry for Map
benpaymaster
A decentralized platform for real-time pharmacy inventory and expiry management. Uses Solidity smart contracts, distributed systems, and AI to help users locate medication, reduce waste, and optimize stock with a live map, expiry alerts, and secure, scalable backend.
guno1928
High-performance concurrent Go map with lock-free reads, atomic writes, TTL expiry, hit-limited entries, copy-on-write values, and scale-aware sharding.
Murtaza-gandhi
The given data was distributed in multiple sheets and I have to analyze and map them by there Expiry sales and profit, and create the summary on the sales.
erohit12
The system where the tickets are mapped to the next available resolver as per his skills and the ticket has to be resolved within its expiry date (if any)
m-mukund
A Redis-inspired in-memory KV store written in modern C++17/20. Features a custom TCP server, RESP protocol parser, concurrent hash map, TTL expiry, and LRU eviction. b\Built from scratch.
santyssk
Android app that allows user to mark and navigate to location of parked vehicle using LocationService and integrating with Google Maps API. The app also provide weather details and parking expiry notifications using Alarm and NotificationServices.
aitsam-cmyk
QuickBuy is a modern online marketplace to buy and sell products easily. It offers ad posting with images, user profiles, favorites, notifications, and map-based locations. Users can edit, delete, or mark ads as sold, with automatic expiry after 30 days.
jaypatel2409
InvySphere is a MERN-based inventory management system with real-time tracking, geolocation via Google Maps, role-based access, Cloudinary image uploads, and smart alerts for low stock and expiry. It features a responsive UI and interactive dashboard for seamless inventory control.
gananwen
FoodieBox is a Flutter mobile app that helps reduce food waste by enabling users to buy leftover restaurant meals and near-expiry groceries at discounted prices. It features real-time item availability, search filters, secure ordering, and Google Maps–based delivery address selection.
SWETHAPRASANNA187
AGRI is a production-ready mobile agriculture marketplace UI/UX system connecting organic farmers and customers. It includes validated farmer onboarding, certificate verification, product expiry automation, AI voice assistant, map-based farm discovery, pick-your-own booking, home delivery checkout, notifications, and multilingual support.
saketchaudhary94
simple and efficient web app to manage your kitchen inventory. Add ingredients with quantity and expiry dates, and get automatic alerts for items that are expiring soon or already expired. Built with React and JavaScript using a Hash Map-based system for quick access and updates.
Georgie-GitHub
A social dining app where users discover nearby restaurants, select one, and share plans with friends. The Restaurants Tab shows a map with details and a 12-hour selection expiry. The Friends Tab displays where friends are dining and allows adding contacts to coordinate plans easily.
farreltr
We're going to make a very simple 2D side-scroller with an interactive fiction component. For gameplay, we'll have a runner (boat) who moves towards the right of the screen. The player needs to avoid obstacles and pick up buffs for as long as possible. These obstacles can come in three different flavours. Random encounter obstacles will consist the main IF component of the game. They will be triggered by the expiry of a timer or by a hot spot on the map and will determine the outcome of the game as time goes on. Other obstacles will reduce the number of people on the boat and disappear after doing so. Pickups will increase the number of people on the boat. Graphics will be in simple pixel art. To start with we will use cubes and standard particle systems. Sound and music needs to be considered.
All 29 repositories loaded