Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit e6a310b

Browse files
committed
Reimplement text_position and related functions to use Boyer-Moore-Horspool
searching instead of naive matching. In the worst case this has the same O(M*N) complexity as the naive method, but the worst case is hard to hit, and the average case is very fast, especially with longer patterns. David Rowley
1 parent 2cf3f66 commit e6a310b

File tree

1 file changed

+185
-30
lines changed

1 file changed

+185
-30
lines changed

src/backend/utils/adt/varlena.c

+185-30
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
*
99
*
1010
* IDENTIFICATION
11-
* $PostgreSQL: pgsql/src/backend/utils/adt/varlena.c,v 1.167 2008/05/27 00:13:09 tgl Exp $
11+
* $PostgreSQL: pgsql/src/backend/utils/adt/varlena.c,v 1.168 2008/09/07 04:20:00 tgl Exp $
1212
*
1313
*-------------------------------------------------------------------------
1414
*/
@@ -39,6 +39,9 @@ typedef struct
3939
pg_wchar *wstr2; /* note: these are palloc'd */
4040
int len1; /* string lengths in logical characters */
4141
int len2;
42+
/* Skip table for Boyer-Moore-Horspool search algorithm: */
43+
int skiptablemask; /* mask for ANDing with skiptable subscripts */
44+
int skiptable[256]; /* skip distance for given mismatched char */
4245
} TextPositionState;
4346

4447
#define DatumGetUnknownP(X) ((unknown *) PG_DETOAST_DATUM(X))
@@ -753,7 +756,7 @@ text_substring(Datum str, int32 start, int32 length, bool length_not_specified)
753756
* If we're working with an untoasted source, no need to do an extra
754757
* copying step.
755758
*/
756-
if (VARATT_IS_COMPRESSED(DatumGetPointer(str)) ||
759+
if (VARATT_IS_COMPRESSED(DatumGetPointer(str)) ||
757760
VARATT_IS_EXTERNAL(DatumGetPointer(str)))
758761
slice = DatumGetTextPSlice(str, slice_start, slice_size);
759762
else
@@ -866,6 +869,7 @@ text_position(text *t1, text *t2)
866869
return result;
867870
}
868871

872+
869873
/*
870874
* text_position_setup, text_position_next, text_position_cleanup -
871875
* Component steps of text_position()
@@ -909,64 +913,215 @@ text_position_setup(text *t1, text *t2, TextPositionState *state)
909913
state->len1 = len1;
910914
state->len2 = len2;
911915
}
916+
917+
/*
918+
* Prepare the skip table for Boyer-Moore-Horspool searching. In these
919+
* notes we use the terminology that the "haystack" is the string to be
920+
* searched (t1) and the "needle" is the pattern being sought (t2).
921+
*
922+
* If the needle is empty or bigger than the haystack then there is no
923+
* point in wasting cycles initializing the table. We also choose not
924+
* to use B-M-H for needles of length 1, since the skip table can't
925+
* possibly save anything in that case.
926+
*/
927+
if (len1 >= len2 && len2 > 1)
928+
{
929+
int searchlength = len1 - len2;
930+
int skiptablemask;
931+
int last;
932+
int i;
933+
934+
/*
935+
* First we must determine how much of the skip table to use. The
936+
* declaration of TextPositionState allows up to 256 elements, but for
937+
* short search problems we don't really want to have to initialize so
938+
* many elements --- it would take too long in comparison to the
939+
* actual search time. So we choose a useful skip table size based on
940+
* the haystack length minus the needle length. The closer the needle
941+
* length is to the haystack length the less useful skipping becomes.
942+
*
943+
* Note: since we use bit-masking to select table elements, the skip
944+
* table size MUST be a power of 2, and so the mask must be 2^N-1.
945+
*/
946+
if (searchlength < 16)
947+
skiptablemask = 3;
948+
else if (searchlength < 64)
949+
skiptablemask = 7;
950+
else if (searchlength < 128)
951+
skiptablemask = 15;
952+
else if (searchlength < 512)
953+
skiptablemask = 31;
954+
else if (searchlength < 2048)
955+
skiptablemask = 63;
956+
else if (searchlength < 4096)
957+
skiptablemask = 127;
958+
else
959+
skiptablemask = 255;
960+
state->skiptablemask = skiptablemask;
961+
962+
/*
963+
* Initialize the skip table. We set all elements to the needle
964+
* length, since this is the correct skip distance for any character
965+
* not found in the needle.
966+
*/
967+
for (i = 0; i <= skiptablemask; i++)
968+
state->skiptable[i] = len2;
969+
970+
/*
971+
* Now examine the needle. For each character except the last one,
972+
* set the corresponding table element to the appropriate skip
973+
* distance. Note that when two characters share the same skip table
974+
* entry, the one later in the needle must determine the skip distance.
975+
*/
976+
last = len2 - 1;
977+
978+
if (!state->use_wchar)
979+
{
980+
const char *str2 = state->str2;
981+
982+
for (i = 0; i < last; i++)
983+
state->skiptable[(unsigned char) str2[i] & skiptablemask] = last - i;
984+
}
985+
else
986+
{
987+
const pg_wchar *wstr2 = state->wstr2;
988+
989+
for (i = 0; i < last; i++)
990+
state->skiptable[wstr2[i] & skiptablemask] = last - i;
991+
}
992+
}
912993
}
913994

914995
static int
915996
text_position_next(int start_pos, TextPositionState *state)
916997
{
917-
int pos = 0,
918-
p,
919-
px;
998+
int haystack_len = state->len1;
999+
int needle_len = state->len2;
1000+
int skiptablemask = state->skiptablemask;
9201001

9211002
Assert(start_pos > 0); /* else caller error */
9221003

923-
if (state->len2 <= 0)
1004+
if (needle_len <= 0)
9241005
return start_pos; /* result for empty pattern */
9251006

1007+
start_pos--; /* adjust for zero based arrays */
1008+
1009+
/* Done if the needle can't possibly fit */
1010+
if (haystack_len < start_pos + needle_len)
1011+
return 0;
1012+
9261013
if (!state->use_wchar)
9271014
{
9281015
/* simple case - single byte encoding */
929-
char *p1 = state->str1;
930-
char *p2 = state->str2;
1016+
const char *haystack = state->str1;
1017+
const char *needle = state->str2;
1018+
const char *haystack_end = &haystack[haystack_len];
1019+
const char *hptr;
9311020

932-
/* no use in searching str past point where search_str will fit */
933-
px = (state->len1 - state->len2);
934-
935-
p1 += start_pos - 1;
1021+
if (needle_len == 1)
1022+
{
1023+
/* No point in using B-M-H for a one-character needle */
1024+
char nchar = *needle;
9361025

937-
for (p = start_pos - 1; p <= px; p++)
1026+
hptr = &haystack[start_pos];
1027+
while (hptr < haystack_end)
1028+
{
1029+
if (*hptr == nchar)
1030+
return hptr - haystack + 1;
1031+
hptr++;
1032+
}
1033+
}
1034+
else
9381035
{
939-
if ((*p1 == *p2) && (strncmp(p1, p2, state->len2) == 0))
1036+
const char *needle_last = &needle[needle_len - 1];
1037+
1038+
/* Start at startpos plus the length of the needle */
1039+
hptr = &haystack[start_pos + needle_len - 1];
1040+
while (hptr < haystack_end)
9401041
{
941-
pos = p + 1;
942-
break;
1042+
/* Match the needle scanning *backward* */
1043+
const char *nptr;
1044+
const char *p;
1045+
1046+
nptr = needle_last;
1047+
p = hptr;
1048+
while (*nptr == *p)
1049+
{
1050+
/* Matched it all? If so, return 1-based position */
1051+
if (nptr == needle)
1052+
return p - haystack + 1;
1053+
nptr--, p--;
1054+
}
1055+
/*
1056+
* No match, so use the haystack char at hptr to decide how
1057+
* far to advance. If the needle had any occurrence of that
1058+
* character (or more precisely, one sharing the same
1059+
* skiptable entry) before its last character, then we advance
1060+
* far enough to align the last such needle character with
1061+
* that haystack position. Otherwise we can advance by the
1062+
* whole needle length.
1063+
*/
1064+
hptr += state->skiptable[(unsigned char) *hptr & skiptablemask];
9431065
}
944-
p1++;
9451066
}
9461067
}
9471068
else
9481069
{
949-
/* not as simple - multibyte encoding */
950-
pg_wchar *p1 = state->wstr1;
951-
pg_wchar *p2 = state->wstr2;
1070+
/* The multibyte char version. This works exactly the same way. */
1071+
const pg_wchar *haystack = state->wstr1;
1072+
const pg_wchar *needle = state->wstr2;
1073+
const pg_wchar *haystack_end = &haystack[haystack_len];
1074+
const pg_wchar *hptr;
9521075

953-
/* no use in searching str past point where search_str will fit */
954-
px = (state->len1 - state->len2);
955-
956-
p1 += start_pos - 1;
1076+
if (needle_len == 1)
1077+
{
1078+
/* No point in using B-M-H for a one-character needle */
1079+
pg_wchar nchar = *needle;
9571080

958-
for (p = start_pos - 1; p <= px; p++)
1081+
hptr = &haystack[start_pos];
1082+
while (hptr < haystack_end)
1083+
{
1084+
if (*hptr == nchar)
1085+
return hptr - haystack + 1;
1086+
hptr++;
1087+
}
1088+
}
1089+
else
9591090
{
960-
if ((*p1 == *p2) && (pg_wchar_strncmp(p1, p2, state->len2) == 0))
1091+
const pg_wchar *needle_last = &needle[needle_len - 1];
1092+
1093+
/* Start at startpos plus the length of the needle */
1094+
hptr = &haystack[start_pos + needle_len - 1];
1095+
while (hptr < haystack_end)
9611096
{
962-
pos = p + 1;
963-
break;
1097+
/* Match the needle scanning *backward* */
1098+
const pg_wchar *nptr;
1099+
const pg_wchar *p;
1100+
1101+
nptr = needle_last;
1102+
p = hptr;
1103+
while (*nptr == *p)
1104+
{
1105+
/* Matched it all? If so, return 1-based position */
1106+
if (nptr == needle)
1107+
return p - haystack + 1;
1108+
nptr--, p--;
1109+
}
1110+
/*
1111+
* No match, so use the haystack char at hptr to decide how
1112+
* far to advance. If the needle had any occurrence of that
1113+
* character (or more precisely, one sharing the same
1114+
* skiptable entry) before its last character, then we advance
1115+
* far enough to align the last such needle character with
1116+
* that haystack position. Otherwise we can advance by the
1117+
* whole needle length.
1118+
*/
1119+
hptr += state->skiptable[*hptr & skiptablemask];
9641120
}
965-
p1++;
9661121
}
9671122
}
9681123

969-
return pos;
1124+
return 0; /* not found */
9701125
}
9711126

9721127
static void

0 commit comments

Comments
 (0)