Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/deploy-production.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ jobs:
- name: Checkout repo
uses: actions/checkout@v6

- name: Set up Python 3.9
- name: Set up Python 3.13
uses: actions/setup-python@v6
with:
python-version: '3.9'
python-version: '3.13'
cache: 'pip'
cache-dependency-path: 'pyproject.toml'

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/deploy-qa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ jobs:
- name: Checkout repo
uses: actions/checkout@v6

- name: Set up Python 3.9
- name: Set up Python 3.13
uses: actions/setup-python@v6
with:
python-version: '3.9'
python-version: '3.13'
cache: 'pip'
cache-dependency-path: 'pyproject.toml'

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ jobs:
- name: Checkout repo
uses: actions/checkout@v6

- name: Set up Python 3.9
- name: Set up Python 3.13
uses: actions/setup-python@v6
with:
python-version: '3.9'
python-version: '3.13'
cache: 'pip'
cache-dependency-path: 'pyproject.toml'

Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# Changelog
## v1.10.0 3/17/26
- Add Snowflake client
- Update config helper to allow loading config files without PLAINTEXT/ENCRYPTED structure
- Update structured log helper to include name of the logger by default

## v1.9.1 3/11/26
- Add merge_contextvars to default structlog configuration

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This package contains common Python utility classes and functions.
* Connecting to and querying a MySQL database
* Connecting to and querying a PostgreSQL database
* Connecting to and querying Redshift
* Connecting to and querying Snowflake
* Making requests to the Oauth2 authenticated APIs such as NYPL Platform API and Sierra
* Interacting with vendor APIs such as cloudLibrary

Expand Down
10 changes: 7 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "nypl_py_utils"
version = "1.9.1"
version = "1.10.0"
authors = [
{ name="Aaron Friedman", email="aaronfriedman@nypl.org" },
]
Expand Down Expand Up @@ -74,6 +74,10 @@ sftp-client = [
"nypl_py_utils[log-helper]",
"paramiko>=3.4.1"
]
snowflake-client = [
"nypl_py_utils[log-helper]",
"snowflake-connector-python>=4.3.0"
]
config-helper = [
"nypl_py_utils[kms-client,log-helper]",
"PyYAML>=6.0"
Expand All @@ -93,11 +97,11 @@ research-catalog-identifier-helper = [
"requests>=2.28.1"
]
development = [
"nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,redshift-client,s3-client,secrets-manager-client,sftp-client,config-helper,log-helper,obfuscation-helper,patron-data-helper,research-catalog-identifier-helper]",
"nypl_py_utils[avro-client,cloudlibrary-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,redshift-client,s3-client,secrets-manager-client,sftp-client,snowflake-client,config-helper,log-helper,obfuscation-helper,patron-data-helper,research-catalog-identifier-helper]",
"flake8>=6.0.0",
"freezegun>=1.2.2",
"mock>=4.0.3",
"pytest==8.0",
"pytest>=8.0.0",
"pytest-mock>=3.10.0",
"requests-mock>=1.10.0"
]
Expand Down
132 changes: 132 additions & 0 deletions src/nypl_py_utils/classes/snowflake_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import snowflake.connector as sc

from nypl_py_utils.functions.log_helper import create_log


class SnowflakeClient:
"""Client for managing connections to Snowflake"""

def __init__(self, account, user, private_key=None, password=None):
self.logger = create_log('snowflake_client')
if (password is None) == (private_key is None):
Copy link
Contributor

@fatimarahman fatimarahman Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah I've never seen a boolean like this before, nice thinking!

raise SnowflakeClientError(
'Either password or private key must be set (but not both)',
self.logger
) from None

self.conn = None
self.account = account
self.user = user
self.private_key = private_key
self.password = password

def connect(self, mfa_code=None, **kwargs):
"""
Connects to Snowflake using the given credentials. If you're connecting
locally, you should be using the password and mfa_code. If the
connection is for production code, a private_key should be set up.

Parameters
----------
mfa_code: str, optional
The six-digit MFA code. Only necessary for connecting as a human
user.
kwargs:
All possible arguments (such as which warehouse to use or how
long to wait before timing out) can be found here:
https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#connect
"""
self.logger.info('Connecting to Snowflake')
if self.private_key is not None:
try:
self.conn = sc.connect(
account=self.account,
user=self.user,
private_key=self.private_key,
**kwargs)
except Exception as e:
raise SnowflakeClientError(
f'Error connecting to Snowflake: {e}', self.logger
) from None
else:
if mfa_code is None:
raise SnowflakeClientError(
'When using a password, an MFA code must also be provided',
self.logger
) from None

pw = self.password + mfa_code
try:
self.conn = sc.connect(
account=self.account,
user=self.user,
password=pw,
passcode_in_password=True,
**kwargs)
except Exception as e:
raise SnowflakeClientError(
f'Error connecting to Snowflake: {e}', self.logger
) from None

def execute_query(self, query, **kwargs):
"""
Executes an arbitrary query against the given connection.

Note that:
1) All results will be fetched by default, so this method is not
suitable if you do not want to load all rows into memory
2) AUTOCOMMIT is on by default, so this method is not suitable if
you want to execute multiple queries in a single transaction
3) This method can be used for both read and write queries, but
it's not optimized for writing -- there is no parameter binding
or executemany support, and the return value for write queries
can be unpredictable.
Comment on lines +80 to +83
Copy link
Contributor

@fatimarahman fatimarahman Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this is annoying of Snowflake -- can you explain why you decided against using the executemany() function provided? Seems like that allows for parameterization (albeit clunkier)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah in the future we may add transaction support more similar to what we do in the Redshift client, and I can see executemany going in there. I held off putting it in here because: a) I didn't want to jam every possible functionality into the same execute_query function, and b) it's actually unclear to me whether we'll need to be writing to Snowflake that much using this client -- it seems like the more "data lake-y" way would be to upload new files to S3 instead.

Ultimately, I wanted a function that could read data and it just so happened to be that you can also execute arbitrary single SQL commands the same way, but the intention wasn't really to support that as a main use case.


Parameters
----------
query: str
The SQL query to execute
kwargs:
All possible arguments (such as timeouts) can be found here:
https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#execute

Returns
-------
sequence
A list of tuples
"""
self.logger.info('Querying Snowflake')
cursor = self.conn.cursor()
try:
try:
cursor.execute(query, **kwargs)
return cursor.fetchall()
except Exception:
raise
finally:
cursor.close()
except Exception as e:
# If there was an error, also close the connection
self.close_connection()

short_q = str(query)
if len(short_q) > 2500:
short_q = short_q[:2497] + '...'
raise SnowflakeClientError(
f'Error executing Snowflake query {short_q}: {e}', self.logger
) from None

def close_connection(self):
"""Closes the connection"""
self.logger.info('Closing Snowflake connection')
self.conn.close()


class SnowflakeClientError(Exception):
def __init__(self, message='', logger=None):
self.message = message
if logger is not None:
logger.error(message)

def __str__(self):
return self.message
62 changes: 40 additions & 22 deletions src/nypl_py_utils/functions/config_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@

def load_env_file(run_type, file_string):
"""
This method loads a YAML config file containing environment variables,
decrypts whichever are encrypted, and puts them all into os.environ as
strings. For a YAML variable containing a list of values, the list is
exported into os.environ as a json string and should be loaded as such.
This method reads a YAML config file containing environment variables and
loads them all into os.environ as strings. See _parse_yaml_dict for more.

It requires the YAML file to be split into a 'PLAINTEXT_VARIABLES' section
and an 'ENCRYPTED_VARIABLES' section. See config/sample.yaml for an example
config file.
If the config file is divided into 'PLAINTEXT_VARIABLES' and
'ENCRYPTED_VARIABLES' sections (see config/sample.yaml for an exmaple), the
'ENCRYPTED_VARIABLES' variables will be decrypted first. Otherwise, all
variables will be loaded as is.

Parameters
----------
Expand All @@ -36,31 +35,50 @@ def load_env_file(run_type, file_string):
try:
env_dict = yaml.safe_load(env_stream)
except yaml.YAMLError:
logger.error('Invalid YAML file: {}'.format(open_file))
raise ConfigHelperError(
'Invalid YAML file: {}'.format(open_file)) from None
except FileNotFoundError:
logger.error('Could not find config file {}'.format(open_file))
raise ConfigHelperError(
'Could not find config file {}'.format(open_file)) from None

if env_dict:
for key, value in env_dict.get('PLAINTEXT_VARIABLES', {}).items():
if type(value) is list:
os.environ[key] = json.dumps(value)
else:
os.environ[key] = str(value)
if ('PLAINTEXT_VARIABLES' in env_dict
or 'ENCRYPTED_VARIABLES' in env_dict):
_parse_yaml_dict(env_dict.get('PLAINTEXT_VARIABLES', {}))

kms_client = KmsClient()
for key, value in env_dict.get('ENCRYPTED_VARIABLES', {}).items():
if type(value) is list:
decrypted_list = [kms_client.decrypt(v) for v in value]
os.environ[key] = json.dumps(decrypted_list)
else:
os.environ[key] = kms_client.decrypt(value)
kms_client.close()
kms_client = KmsClient()
_parse_yaml_dict(env_dict.get(
'ENCRYPTED_VARIABLES', {}), kms_client)
kms_client.close()
else:
_parse_yaml_dict(env_dict)


def _parse_yaml_dict(yaml_dict, kms_client=None):
"""
Loads YAML dict into os.environ. All values are stored as strings to match
how AWS Lambda environment variables are stored. For list variables, the
list is exported into os.environ as a json string.

If kms_client is not empty, decrypts the variables first.

Does not allow for sub-dictionaries.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any chance we'd use sub-dictionaries in our configs in the future? Or is that frowned upon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against it! I just think the use case for this file is loading config variables as environment variables, and it's unclear what the user would expect in the case of a subdictionary. Do all of the sub-keys get loaded as their own env variables, or does the whole dict get loaded as a JSON string, or? In general, my feeling is I haven't had a use case that required using sub-dictionaries, so there's no use trying to over-engineer this for a hypothetical use case.

"""
for key, value in yaml_dict.items():
if type(value) is dict:
raise ConfigHelperError(
'Found sub-dictionary in YAML config') from None
elif type(value) is list:
val = [kms_client.decrypt(v)
for v in value] if kms_client else value
os.environ[key] = json.dumps(val)
else:
val = kms_client.decrypt(value) if kms_client else value
os.environ[key] = str(val)


class ConfigHelperError(Exception):
def __init__(self, message=None):
self.message = message
if message is not None:
logger.error(message)
40 changes: 24 additions & 16 deletions src/nypl_py_utils/functions/log_helper.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @yossariano -- I updated this again so that the name of the logger would be output. I also think this means that different struct loggers will now act a little more independently, which is good, although I could be wrong

Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,38 @@
}


# Configure structlog to be machine-readable first and foremost
# while still making it easy for humans to parse
# End result (without additional bindings) is JSON like this:
# {
# "logger": "module param",
# "message": "this is a test log event",
# "level": "info",
# "timestamp": "2023-11-01 18:50:47"
# }
def get_structlog(module):
structlog.configure(
"""
Standard logging without additional bindings looks as follows:
{
"level": "info",
"timestamp": "2026-01-01T12:00:00.613719Z",
"logger": "module param",
"message": "this is a test log event"
}

Note that: 1) you should *NOT* use the same module name for a structlog
and for a standard logger, and 2) using bind_contextvars will bind
variables to *all* loggers. To bind a context variable on one logger
without binding it to others, use `logger = logger.bind(contextvar=0)`.
"""
logger = logging.getLogger(module)
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO').upper())
logger.propagate = False # Prevents double logging

return structlog.wrap_logger(
logger,
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt='iso'),
structlog.stdlib.add_logger_name,
structlog.processors.EventRenamer('message'),
structlog.processors.JSONRenderer(),
],
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
]
)

return structlog.get_logger(module)


def standard_logger(module):
logger = logging.getLogger(module)
Expand All @@ -58,7 +66,7 @@ def standard_logger(module):


def create_log(module, json=False):
if (json):
if json:
return get_structlog(module)
else:
return standard_logger(module)
Loading
Loading