mirror of
https://github.com/workhardbekind/workout-challenge.git
synced 2026-07-04 01:13:32 -04:00
169 lines
7.2 KiB
Python
169 lines
7.2 KiB
Python
import datetime
|
|
|
|
from django.conf import settings
|
|
from django.apps import apps
|
|
from rest_framework.response import Response
|
|
from rest_framework import status
|
|
|
|
from django.db.models import Sum, Count, ExpressionWrapper, DurationField, F, Q, IntegerField
|
|
from django.db.models.functions import TruncDay, Now, ExtractDay
|
|
|
|
|
|
def _add_rank(data, key, enhance_dict, id_field, rank_field='rank', reverse=True):
|
|
sorted_data = sorted(data, key=lambda x: x[key], reverse=reverse)
|
|
rank = 0
|
|
last_value = None
|
|
user_lst = []
|
|
for idx, item in enumerate(sorted_data, start=1):
|
|
if item[key] != last_value:
|
|
rank = idx
|
|
last_value = item[key]
|
|
sorted_data[idx-1] = {**item, rank_field: rank, **enhance_dict[item[id_field]]}
|
|
enhance_dict[item[id_field]]['rank'] = rank
|
|
enhance_dict[item[id_field]]['points'] = item[key]
|
|
user_lst.append(item[id_field])
|
|
for i in [i for i in enhance_dict.keys() if i not in user_lst]:
|
|
sorted_data.append({**enhance_dict[i], rank_field: None, key: None})
|
|
enhance_dict[i]['rank'] = None
|
|
enhance_dict[i]['points'] = None
|
|
return sorted_data
|
|
|
|
|
|
def get_competition_stats(competition, last_seven_days=False):
|
|
CustomUser = apps.get_model('custom_user', 'CustomUser')
|
|
Competition = apps.get_model('competition', 'Competition')
|
|
Points = apps.get_model('competition', 'Points')
|
|
Team = apps.get_model('competition', 'Team')
|
|
|
|
# Custom query logic
|
|
try:
|
|
competition_obj = Competition.objects.get(id=competition)
|
|
except Competition.DoesNotExist:
|
|
return Response({"detail": "Competition not found."}, status=status.HTTP_404_NOT_FOUND)
|
|
all_points = Points.objects.filter(Q(award__competition__id=competition) | Q(goal__competition_id=competition))
|
|
|
|
if last_seven_days:
|
|
today = datetime.date.today()
|
|
last_sunday = today - datetime.timedelta(days=today.weekday() + 1) if today.weekday() != 6 else today
|
|
monday_before = last_sunday - datetime.timedelta(days=6)
|
|
all_points = all_points.filter(workout__start_datetime__gte=monday_before, workout__start_datetime__lt=last_sunday + datetime.timedelta(days=1))
|
|
|
|
# For SQLite
|
|
if settings.DATABASES.get('default', {}).get('ENGINE') == 'django.db.backends.sqlite3':
|
|
all_points_date = (
|
|
all_points
|
|
.annotate(date=TruncDay('workout__start_datetime'))
|
|
.annotate(tmp_today=TruncDay(Now()))
|
|
.annotate(tmp_start_date=TruncDay(F('workout__start_datetime')))
|
|
.annotate(days_ago=ExpressionWrapper((F('tmp_today') - F('tmp_start_date')) / 86_400_000_000, output_field=IntegerField()))
|
|
)
|
|
# For Postgres
|
|
else:
|
|
all_points_date = (
|
|
all_points
|
|
.annotate(date=TruncDay('workout__start_datetime'))
|
|
.annotate(days_ago_duration=ExpressionWrapper(TruncDay(Now()) - TruncDay(F('workout__start_datetime')), output_field=DurationField()))
|
|
.annotate(days_ago=ExtractDay(F('days_ago_duration')))
|
|
)
|
|
tmp_all = (
|
|
all_points_date
|
|
.values('days_ago')
|
|
.annotate(total=Sum('points_capped'))
|
|
.values('days_ago', 'total')
|
|
.order_by('-days_ago')
|
|
)
|
|
timeseries_all = {}
|
|
for i in tmp_all:
|
|
days_ago = i.pop('days_ago')
|
|
timeseries_all[days_ago] = i
|
|
|
|
tmp_user = (
|
|
all_points_date
|
|
.values('days_ago', 'workout__user__id')
|
|
.annotate(total=Sum('points_capped'))
|
|
.order_by('-days_ago')
|
|
)
|
|
timeseries_user = {}
|
|
for i in tmp_user:
|
|
user_id = i.pop('workout__user__id')
|
|
days_ago = i.pop('days_ago')
|
|
if user_id not in timeseries_user:
|
|
timeseries_user[user_id] = {}
|
|
timeseries_user[user_id][days_ago] = i
|
|
|
|
tmp_team = (
|
|
all_points_date
|
|
.values('days_ago', 'workout__user__my_teams')
|
|
.annotate(total=Sum('points_capped'))
|
|
.order_by('-days_ago')
|
|
)
|
|
timeseries_team = {}
|
|
for i in tmp_team:
|
|
team_id = i.pop('workout__user__my_teams')
|
|
days_ago = i.pop('days_ago')
|
|
if team_id not in timeseries_team:
|
|
timeseries_team[team_id] = {}
|
|
timeseries_team[team_id][days_ago] = i
|
|
|
|
# Get user data
|
|
user_dict = {i['id']: i for i in CustomUser.objects.filter(my_competitions=competition).values('id', 'username', 'strava_allow_follow', 'strava_athlete_id').order_by('username', 'id')}
|
|
for key, value in user_dict.items():
|
|
if value['strava_allow_follow'] is False:
|
|
value['strava_athlete_id'] = None
|
|
|
|
# Get user rankings
|
|
leaderboard_user = (
|
|
all_points
|
|
.values('workout__user__id')
|
|
.annotate(total_capped=Sum('points_capped'))
|
|
.order_by('-total_capped')
|
|
)
|
|
leaderboard_user = _add_rank(leaderboard_user, key="total_capped", enhance_dict=user_dict, id_field='workout__user__id')
|
|
leaderboard_user_dict = {i['id']: i for i in leaderboard_user}
|
|
|
|
# Get team data
|
|
team_dict = {i.id: {'id': i.id, 'name': i.name, 'members': [leaderboard_user_dict.get(i.id, {'id': i.id, 'username': 'ERROR', 'total_capped': None}) for i in i.user.all()]} for i in Team.objects.filter(competition=competition).prefetch_related('user')}
|
|
for key, value in team_dict.items():
|
|
value['active_member_count'] = sum(1 for i in value.get('members', []) if i.get('total_capped', 0) is not None and i.get('total_capped', 0) > 0)
|
|
value['member_count'] = len(value.get('members', []))
|
|
|
|
# Get team rankings
|
|
leaderboard_team = (
|
|
all_points
|
|
.values('workout__user__my_teams__id')
|
|
.annotate(total_capped=Sum('points_capped'))
|
|
.order_by('-total_capped')
|
|
)
|
|
leaderboard_team = [{**i, 'total_capped': i['total_capped'] / max(1, team_dict[i['workout__user__my_teams__id']]['active_member_count'])} for i in leaderboard_team if i['workout__user__my_teams__id'] in team_dict]
|
|
leaderboard_team = _add_rank(leaderboard_team, key="total_capped", enhance_dict=team_dict, id_field='workout__user__my_teams__id')
|
|
team_dict = {i['id']: i for i in leaderboard_team}
|
|
|
|
competition_details = {
|
|
'name': competition_obj.name,
|
|
'owner': user_dict.get(competition_obj.owner.pk, {'id': competition_obj.owner.pk, 'username': 'ERROR', 'total_capped': None}),
|
|
'members': [user_dict.get(i, {'id': i, 'username': 'ERROR', 'total_capped': None}) for i in list(competition_obj.user.all().values_list('pk', flat=True))],
|
|
'member_count': competition_obj.user.all().count(),
|
|
'active_member_count': len(timeseries_user),
|
|
'start_date': competition_obj.start_date,
|
|
'start_date_count': (datetime.date.today() - competition_obj.start_date).days,
|
|
'end_date': competition_obj.end_date,
|
|
'end_date_count': (datetime.date.today() - competition_obj.end_date).days,
|
|
'has_teams': competition_obj.has_teams,
|
|
'goals': competition_obj.activitygoal_set.all().values(),
|
|
}
|
|
|
|
response_obj = {
|
|
'competition': competition_details,
|
|
'users': user_dict,
|
|
'teams': team_dict,
|
|
'timeseries': {
|
|
'all': timeseries_all,
|
|
'user': timeseries_user,
|
|
'team': timeseries_team,
|
|
},
|
|
'leaderboard': {
|
|
'team': leaderboard_team,
|
|
'individual': leaderboard_user,
|
|
}
|
|
}
|
|
return response_obj
|