Skip to content

abrupt_shaving

Abrupt Trend Handling Utilities

Functions for refining abrupt segments by detecting changepoints using z-score outliers.

shave_abrupt_trends(df, value_col, segments, method_params, second_pass=False, init_segments=None)

Refines abrupt segments by detecting changepoints using z-score outliers.

This function identifies sharp transitions missed by rolling statistics and adjusts segment boundaries based on statistical anomalies in the signal's first differences. It also supports multi-abrupt detection within a segment and optional padding to extend abrupt ends.

Parameters:

  • df

    (DataFrame) –

    Time series DataFrame.

  • value_col

    (str) –

    Name of the signal column.

  • segments

    (list) –

    List of segment dictionaries with 'trend_class': 'abrupt'.

  • method_params

    (dict) –

    Optional parameters for padding and control. Supported keys:

    • is_abrupt_padded (bool): Whether to pad abrupt segments. Defaults to False.
    • abrupt_padding (int): Number of days to pad. Defaults to 28.

Returns:

  • list ( list[dict] ) –

    Refined segment list with adjusted abrupt boundaries.

Source code in pytrendy/post_processing/segments_refine/abrupt_shaving.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def shave_abrupt_trends(df: pd.DataFrame, value_col: str, segments: list[dict], method_params: dict, second_pass: bool = False, init_segments: list[dict] | None = None) -> list[dict]:
    """
    Refines abrupt segments by detecting changepoints using z-score outliers.

    This function identifies sharp transitions missed by rolling statistics and
    adjusts segment boundaries based on statistical anomalies in the signal's first differences.
    It also supports multi-abrupt detection within a segment and optional padding to extend abrupt ends.

    Args:
        df (pd.DataFrame): Time series DataFrame.
        value_col (str): Name of the signal column.
        segments (list): List of segment dictionaries with `'trend_class': 'abrupt'`.
        method_params (dict): Optional parameters for padding and control. Supported keys:

            - **is_abrupt_padded** (`bool`): Whether to pad abrupt segments. Defaults to `False`.
            - **abrupt_padding** (`int`): Number of days to pad. Defaults to `28`.

    Returns:
        list: Refined segment list with adjusted abrupt boundaries.
    """

    if init_segments is None:
        init_segments = []

    segments_refined = deepcopy(segments)
    new_segments = []
    for i, segment in enumerate(segments_refined):
        if segment['direction'] not in ['Up', 'Down'] or segment['trend_class'] != 'abrupt': 
            continue

        if second_pass:
            init_segment = init_segments[i]
            is_not_prev_trend = 'trend_class' not in init_segment # edge case, in case not trend before
            is_not_reclassified = is_not_prev_trend or segment['trend_class'] == init_segment['trend_class']
            if is_not_reclassified:
                continue # exit if not re-classified for sake of second pass

        # Get start end padded for some leniency
        start = pd.to_datetime(segment['start']) - pd.Timedelta(days=2)
        end = pd.to_datetime(segment['end']) + pd.Timedelta(days=2)
        df_segment = df.loc[start:end].copy()

        # Use z-score on diff, to know when a change is an anomoly in the trend
        df_segment['diff'] = df_segment[value_col].diff()
        df_segment = df_segment.iloc[1:]
        df_segment['z_score'] = (df_segment['diff'] - df_segment['diff'].mean()) / df_segment['diff'].std()
        df_segment['abrupt_flag'] = 0
        df_segment.loc[(df_segment['z_score'].abs() > 1), 'abrupt_flag'] = 1

        # Note: Follows very similar code to process signals 3.4. 
        df_segment['abrupt_flag_diff'] = df_segment['abrupt_flag'].diff()
        abrupt_starts = df_segment.loc[df_segment['abrupt_flag_diff'] == 1].index
        abrupt_ends = df_segment.loc[df_segment['abrupt_flag_diff'] == -1].index

        # Construct abrupt sub-segments list based on flag_diff
        abrupt_subsegs = []
        for abrupt_start in abrupt_starts: # Loops from first start onwards
            after_ends = [end for end in abrupt_ends if end > abrupt_start]

            # Get abrupt end as
            if len(after_ends) > 0:
                abrupt_end = after_ends[0]  # first if aligned
            elif abrupt_start == df.index[-1]: 
                abrupt_end = min(abrupt_start + pd.Timedelta(days=1), df.index[-1])
            else:
                continue # neither if not connected

            abrupt_subsegs.append(dict(start=abrupt_start, end=abrupt_end))

        if len(abrupt_ends) > 0: # Adds abrupt end with no start if at beginning
            abrupt_end = abrupt_ends[0]
            early_starts = [start for start in abrupt_starts if start < abrupt_end]
            if len(early_starts) == 0:
                abrupt_start = max(abrupt_end - pd.Timedelta(days=1), df.index[0])
                abrupt_subsegs.insert(0, dict(start=abrupt_start, end=abrupt_end))

        # If in right direction shave out abrupt subsegs from abrupt segment & adjust neighbours.
        for j, abrupt_subseg in enumerate(abrupt_subsegs):
            new_start = abrupt_subseg['start'] - pd.Timedelta(days=1)
            new_end = abrupt_subseg['end'] - pd.Timedelta(days=1)

            start_value = df.loc[new_start, value_col] # referencing df, in case outside df_segment scope
            end_value = df.loc[new_end, value_col]
            value_change = end_value - start_value

            direction = 'Up' if value_change > 0 else 'Down'

            if direction != segment['direction']:
                continue

            if j == 0:
                # Update current segment
                segments_refined[i]['start'] = new_start.strftime('%Y-%m-%d')
                update_prev_segment(i, new_start, segments, segments_refined)

                segments_refined[i]['end'] = new_end.strftime('%Y-%m-%d')
                update_next_segment(i, new_end, segments, segments_refined)

            elif j > 0:
                # Wedge in a new segment between current and next (needed for edge case of many abrupt near each other)
                new_seg = segment.copy()
                new_seg['start'] = new_start.strftime('%Y-%m-%d')
                new_seg['end'] = new_end.strftime('%Y-%m-%d')
                new_segments.append((i, new_seg))  # Store with reference index

    # Add to main segments list, then sort.
    for offset, (base_index, new_seg) in enumerate(new_segments):
        insert_index = base_index + offset + 1
        segments_refined.insert(insert_index, new_seg)
        segments.insert(insert_index, new_seg)
        update_prev_segment(insert_index, pd.to_datetime(new_seg['start']), segments, segments_refined)
        update_next_segment(insert_index, pd.to_datetime(new_seg['end']), segments, segments_refined)
    segments_refined = sorted(segments_refined, key=lambda seg: pd.to_datetime(seg['start']))

    # Second pass to pad segments if specified
    segments_padded = deepcopy(segments_refined)
    if method_params.get('is_abrupt_padded', False) == True:

        meta_df = pd.DataFrame(segments_refined) # metadata df, to filter by datetime easily
        meta_df['start'] = pd.to_datetime(meta_df['start'])
        meta_df['end'] = pd.to_datetime(meta_df['end'])

        for i, segment in enumerate(segments_refined):

            if segment['direction'] not in ['Up', 'Down'] or segment['trend_class'] != 'abrupt': 
                continue

            abrupt_start = pd.to_datetime(segment['start'])
            abrupt_end = pd.to_datetime(segment['end'])

            # Simulate new end with padding and cater for any overlaps it might cause
            new_end = abrupt_end + pd.Timedelta(days=method_params['abrupt_padding'])
            overlaps = meta_df.loc[(meta_df['start'] > abrupt_end) & (meta_df['start'] <= new_end)]
            overlaps_nonflats = overlaps[overlaps['direction']!='Flat']

            # Adjust padding to be before first nonflat segment that it would overlap
            if not overlaps_nonflats.empty:
                first_notflat_overlap = overlaps_nonflats.iloc[0]
                new_end = pd.to_datetime(first_notflat_overlap['start']) - pd.Timedelta(days=1)

            new_end = min(new_end, df.index[-1]) # make sure doesnt go out of bounds
            segments_padded[i]['end'] = new_end.strftime('%Y-%m-%d')
            update_next_segment(i, new_end, segments_refined, segments_padded) # will always be a flat it adjusts/overwrites

            # Store meta data that got padded & stretched out
            segments_padded[i]['padded'] = True if new_end != abrupt_end else False

    return segments_padded