Skip to content

Instantly share code, notes, and snippets.

@paschmaria
Created May 20, 2020 09:57
Show Gist options
  • Save paschmaria/a93d5cd8cc68fe9d72bd223511bdbe01 to your computer and use it in GitHub Desktop.
Save paschmaria/a93d5cd8cc68fe9d72bd223511bdbe01 to your computer and use it in GitHub Desktop.
import datetime
import warnings
from django.db import models
from django.db.models.signals import post_delete, post_save, pre_delete
try:
from django.utils.timezone import now
except ImportError:
now = datetime.datetime.now
# define basestring for python 3
try:
basestring
except NameError:
basestring = (str, bytes)
class PositionField(models.IntegerField):
def __init__(self, verbose_name=None, name=None, default=-1, collection=None, parent_link=None, unique_for_field=None, unique_for_fields=None, *args, **kwargs):
if 'unique' in kwargs:
raise TypeError("%s can't have a unique constraint." % self.__class__.__name__)
super(PositionField, self).__init__(verbose_name, name, default=default, *args, **kwargs)
# Backwards-compatibility mess begins here.
if collection is not None and unique_for_field is not None:
raise TypeError("'collection' and 'unique_for_field' are incompatible arguments.")
if collection is not None and unique_for_fields is not None:
raise TypeError("'collection' and 'unique_for_fields' are incompatible arguments.")
if unique_for_field is not None:
warnings.warn("The 'unique_for_field' argument is deprecated. Please use 'collection' instead.", DeprecationWarning)
if unique_for_fields is not None:
raise TypeError("'unique_for_field' and 'unique_for_fields' are incompatible arguments.")
collection = unique_for_field
if unique_for_fields is not None:
warnings.warn("The 'unique_for_fields' argument is deprecated. Please use 'collection' instead.", DeprecationWarning)
collection = unique_for_fields
# Backwards-compatibility mess ends here.
if isinstance(collection, basestring):
collection = (collection,)
self.collection = collection
self.parent_link = parent_link
self._collection_changed = None
def contribute_to_class(self, cls, name):
super(PositionField, self).contribute_to_class(cls, name)
for constraint in cls._meta.unique_together:
if self.name in constraint:
raise TypeError("%s can't be part of a unique constraint." % self.__class__.__name__)
self.auto_now_fields = []
for field in cls._meta.fields:
if getattr(field, 'auto_now', False):
self.auto_now_fields.append(field)
setattr(cls, self.name, self)
pre_delete.connect(self.prepare_delete, sender=cls)
post_delete.connect(self.update_on_delete, sender=cls)
post_save.connect(self.update_on_save, sender=cls)
def pre_save(self, model_instance, add):
#NOTE: check if the node has been moved to another collection; if it has, delete it from the old collection.
previous_instance = None
collection_changed = False
if not add and self.collection is not None:
previous_instance = type(model_instance)._default_manager.get(pk=model_instance.pk)
for field_name in self.collection:
field = model_instance._meta.get_field(field_name)
current_field_value = getattr(model_instance, field.attname)
previous_field_value = getattr(previous_instance, field.attname)
if previous_field_value != current_field_value:
collection_changed = True
break
if not collection_changed:
previous_instance = None
self._collection_changed = collection_changed
if collection_changed:
self.remove_from_collection(previous_instance)
cache_name = self.get_cache_name()
current, updated = getattr(model_instance, cache_name)
if collection_changed:
current = None
if add:
if updated is None:
updated = current
current = None
#elif updated is None:
# updated = -1
# existing instance, position not modified; no cleanup required
if current is not None and updated is None:
return current
collection_count = self.get_collection(model_instance).count()
if current is None:
max_position = collection_count
else:
max_position = collection_count - 1
min_position = 0
# new instance; appended; no cleanup required on post_save
if add and (updated == -1 or updated >= max_position):
setattr(model_instance, cache_name, (max_position, None))
return max_position
if max_position >= updated >= min_position:
# positive position; valid index
position = updated
elif updated > max_position:
# positive position; invalid index
position = max_position
elif abs(updated) <= (max_position + 1):
# negative position; valid index
# Add 1 to max_position to make this behave like a negative list index.
# -1 means the last position, not the last position minus 1
position = max_position + 1 + updated
else:
# negative position; invalid index
position = min_position
# instance inserted; cleanup required on post_save
setattr(model_instance, cache_name, (current, position))
return position
def __get__(self, instance, owner):
if instance is None:
raise AttributeError("%s must be accessed via instance." % self.name)
current, updated = getattr(instance, self.get_cache_name())
return current if updated is None else updated
def __set__(self, instance, value):
if instance is None:
raise AttributeError("%s must be accessed via instance." % self.name)
if value is None:
value = self.default
cache_name = self.get_cache_name()
try:
current, updated = getattr(instance, cache_name)
except AttributeError:
current, updated = value, None
else:
updated = value
instance.__dict__[self.name] = value # Django 1.10 fix for deferred fields
setattr(instance, cache_name, (current, updated))
def get_collection(self, instance):
filters = {}
if self.collection is not None:
for field_name in self.collection:
field = instance._meta.get_field(field_name)
field_value = getattr(instance, field.attname)
if field.null and field_value is None:
filters['%s__isnull' % field.name] = True
else:
filters[field.name] = field_value
model = type(instance)
parent_link = self.parent_link
if parent_link is not None:
model = model._meta.get_field(parent_link).rel.to
return model._default_manager.filter(**filters)
def get_next_sibling(self, instance):
"""
Returns the next sibling of this instance.
"""
try:
return self.get_collection(instance).filter(**{'%s__gt' % self.name: getattr(instance, self.get_cache_name())[0]})[0]
except:
return None
def remove_from_collection(self, instance):
"""
Removes a positioned item from the collection.
"""
queryset = self.get_collection(instance)
current = getattr(instance, self.get_cache_name())[0]
updates = {self.name: models.F(self.name) - 1}
if self.auto_now_fields:
right_now = now()
for field in self.auto_now_fields:
updates[field.name] = right_now
queryset.filter(**{'%s__gt' % self.name: current}).update(**updates)
def prepare_delete(self, sender, instance, **kwargs):
next_sibling = self.get_next_sibling(instance)
if next_sibling:
setattr(instance, '_next_sibling_pk', next_sibling.pk)
else:
setattr(instance, '_next_sibling_pk', None)
pass
def update_on_delete(self, sender, instance, **kwargs):
next_sibling_pk = getattr(instance, '_next_sibling_pk', None)
if next_sibling_pk:
try:
next_sibling = type(instance)._default_manager.get(pk=next_sibling_pk)
except:
next_sibling = None
if next_sibling:
queryset = self.get_collection(next_sibling)
current = getattr(instance, self.get_cache_name())[0]
updates = {self.name: models.F(self.name) - 1}
if self.auto_now_fields:
right_now = now()
for field in self.auto_now_fields:
updates[field.name] = right_now
queryset.filter(**{'%s__gt' % self.name: current}).update(**updates)
setattr(instance, '_next_sibling_pk', None)
def update_on_save(self, sender, instance, created, **kwargs):
collection_changed = self._collection_changed
self._collection_changed = None
current, updated = getattr(instance, self.get_cache_name())
if updated is None and not collection_changed:
return None
queryset = self.get_collection(instance).exclude(pk=instance.pk)
updates = {}
if self.auto_now_fields:
right_now = now()
for field in self.auto_now_fields:
updates[field.name] = right_now
if updated is None and created:
updated = -1
if created or collection_changed:
# increment positions gte updated or node moved from another collection
queryset = queryset.filter(**{'%s__gte' % self.name: updated})
updates[self.name] = models.F(self.name) + 1
elif updated > current:
# decrement positions gt current and lte updated
queryset = queryset.filter(**{'%s__gt' % self.name: current, '%s__lte' % self.name: updated})
updates[self.name] = models.F(self.name) - 1
else:
# increment positions lt current and gte updated
queryset = queryset.filter(**{'%s__lt' % self.name: current, '%s__gte' % self.name: updated})
updates[self.name] = models.F(self.name) + 1
queryset.update(**updates)
setattr(instance, self.get_cache_name(), (updated, None))
def south_field_triple(self):
from south.modelsinspector import introspector
field_class = "django.db.models.fields.IntegerField"
args, kwargs = introspector(self)
return (field_class, args, kwargs)
# Usage
class Playlist(models.Model):
title = models.CharField(max_length=120)
updated = models.DateTimeField(auto_now=True)
timestamp = models.DateTimeField(auto_now_add=True)
class PlaylistItem(models.Model):
playlist = models.ForeignKey(PlayList, on_delete=models.SET_NULL, null=True)
video = models.ForeignKey(Video, on_delete=models.SET_NULL , null=True)
description = models.TextField(null=True, blank=True)
order = PositionField(collection='playlist')
updated = models.DateTimeField(auto_now=True)
timestamp = models.DateTimeField(auto_now_add=True)
import random
import string
def random_string_generator(size=10, chars=string.ascii_lowercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
print(random_string_generator())
print(random_string_generator(size=50))
from django.utils.text import slugify
from random_string_generator import random_string_generator
def unique_slug_generator(instance, new_slug=None):
"""
This is for a Django project and it assumes your instance
has a model with a slug field and a title character (char) field.
"""
if new_slug is not None:
slug = new_slug
else:
slug = slugify(instance.title)
Klass = instance.__class__
qs_exists = Klass.objects.filter(slug=slug).exists()
if qs_exists:
new_slug = "{slug}-{randstr}".format(
slug=slug,
randstr=random_string_generator(size=4)
)
return unique_slug_generator(instance, new_slug=new_slug)
return slug
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment