Skip to content

Instantly share code, notes, and snippets.

View ponugoti's full-sized avatar
🇨🇦

Jesse Ponugoti ponugoti

🇨🇦
View GitHub Profile
@ponugoti
ponugoti / update_variables_dag.py
Created May 14, 2022 00:54
How to enable Slack (success/failure) Airflow alerts
from datetime import datetime
import logging
from pathlib import Path
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from helpers.DagHelper import get_dag_id
from helpers.SlackHelper import post_slack_alert
from tecton import (
stream_window_aggregate_feature_view,
Input,
transformation,
FeatureAggregation,
materialization_context,
)
from tecton.aggregation_functions import last_distinct
from suggestions import config
ws = tecton.get_workspace('dev-jesse')
sdf = ws.get_feature_view('user_group_ids_24h').get_historical_features(from_source=True).to_pandas()
sdf.loc[sdf.user_id.eq('012e7960-e36b-4a31-98b0-2ad8abac2567') & sdf.start_date.eq('2021-12-16')]
# OUTPUT -- as expected with 4 non-uuid strings under 'group_id'
# tenant_id user_id start_date group_id_lastn300_24h_5m timestamp
# 9 4db5938e-2439-4ae7-b881-49435e9b2fe9 012e7960-e36b-4a31-98b0-2ad8abac2567 2021-12-16 [SREAL-70, TMPRO-2047, QP-223, TMA-13469] 2021-12-16 00:05:00
import pandas as pd
ws = tecton.get_workspace('dev-jesse')
spine = pd.DataFrame({
'tenant_id': ['4db5938e-2439-4ae7-b881-49435e9b2fe9'],
'user_id': ['008641bc-516b-4b1e-9699-853624cb186d'],
'start_date': ['2021-12-28'],
'timestamp': pd.to_datetime(['2021-12-29'])
})
An error was encountered:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/mnt/yarn/usercache/livy/appcache/application_1639628500827_0175/container_1639628500827_0175_01_000139/pyspark.zip/pyspark/worker.py", line 604, in main
process()
File "/mnt/yarn/usercache/livy/appcache/application_1639628500827_0175/container_1639628500827_0175_01_000139/pyspark.zip/pyspark/worker.py", line 596, in process
serializer.dump_stream(out_iter, outfile)
File "/mnt/yarn/usercache/livy/appcache/application_1639628500827_0175/container_1639628500827_0175_01_000139/pyspark.zip/pyspark/sql/pandas/serializers.py", line 273, in dump_stream
return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/usr/local/lib64/python3.7/site-packages/pendulum/parsing/__init__.py", line 132, in _parse
text, dayfirst=options["day_first"], yearfirst=options["year_first"]
File "/usr/local/lib/python3.7/site-packages/dateutil/parser/_parser.py", line 1368, in parse
return DEFAULTPARSER.parse(timestr, **kwargs)
File "/usr/local/lib/python3.7/site-packages/dateutil/parser/_parser.py", line 643, in parse
raise ParserError("Unknown string format: %s", timestr)
dateutil.parser._parser.ParserError: Unknown string format: None
An error was encountered:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/mnt/yarn/usercache/livy/appcache/application_1639628500827_0031/container_1639628500827_0031_01_000052/pyspark.zip/pyspark/worker.py", line 604, in main
process()
File "/mnt/yarn/usercache/livy/appcache/application_1639628500827_0031/container_1639628500827_0031_01_000052/pyspark.zip/pyspark/worker.py", line 596, in process
serializer.dump_stream(out_iter, outfile)
File "/mnt/yarn/usercache/livy/appcache/application_1639628500827_0031/container_1639628500827_0031_01_000052/pyspark.zip/pyspark/sql/pandas/serializers.py", line 273, in dump_stream
return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
An error was encountered:
Writing job aborted.
=== Streaming Query ===
Identifier: [id = 40f18491-6ac7-4dad-8320-36c925579ef2, runId = d4b690e6-687c-4b7f-baf9-95cd6779273b]
Current Committed Offsets: {KinesisSource[pyspark-kinesis]: {"metadata":{"streamName":"pyspark-kinesis","batchId":"83"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49624913852314749156571341911281053750143952580360798210"}}}
Current Available Offsets: {KinesisSource[pyspark-kinesis]: {"metadata":{"streamName":"pyspark-kinesis","batchId":"84"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49624913852314749156571341911281053750143952580360798210"}}}
Current State: ACTIVE
Thread State: RUNNABLE
Collecting setuptools_scm
Downloading https://files.pythonhosted.org/packages/bc/bf/353180314d0e27929703faf240c244f25ae765e01f595a010cafb209ab51/setuptools_scm-6.3.2-py3-none-any.whl
Requirement already satisfied: packaging>=20.0 in /usr/local/lib/python3.7/site-packages (from setuptools_scm)
Collecting tomli>=1.0.0 (from setuptools_scm)
Downloading https://files.pythonhosted.org/packages/e2/9f/5e1557a57a7282f066351086e78f87289a3446c47b2cb5b8b2f614d8fe99/tomli-2.0.0-py3-none-any.whl
Requirement already satisfied: setuptools in /mnt/tmp/1639545707814-0/lib/python3.7/site-packages (from setuptools_scm)
Requirement already satisfied: pyparsing!=3.0.5,>=2.0.2 in /usr/local/lib/python3.7/site-packages (from packaging>=20.0->setuptools_scm)
Installing collected packages: tomli, setuptools-scm
Successfully installed setuptools-scm-6.3.2 tomli-2.0.0
@ponugoti
ponugoti / cram_to_anki.py
Last active May 2, 2021 04:15
Move Flashcards from Cram to Anki
#!/usr/bin/python3.8
"""
Cram to Anki Conversion
-- CRAM EXPORT
-- Please note that this script was tested on 2021-04-29.
1. Navigate to a card deck on cram.com and click 'Export'
2. Select CSV