-
Notifications
You must be signed in to change notification settings - Fork 644
Expand file tree
/
Copy pathesql.py
More file actions
60 lines (48 loc) · 2.08 KB
/
esql.py
File metadata and controls
60 lines (48 loc) · 2.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""ESQL Query Parsing Classes."""
import re
from dataclasses import dataclass
@dataclass
class EventDataset:
"""Dataclass for event.dataset with integration and datastream parts."""
package: str
integration: str
def __str__(self) -> str:
return f"{self.package}.{self.integration}"
def get_esql_query_event_dataset_integrations(query: str) -> list[EventDataset]:
"""Extract event.dataset and data_stream.dataset integrations from an ES|QL query."""
number_of_parts = 2
# Regex patterns for event.dataset, and data_stream.dataset
# This mimics the logic in get_datasets_and_modules but for ES|QL as we do not have an ast
regex_patterns = {
"in": [
re.compile(r"event\.dataset\s+in\s*\(\s*([^)]+)\s*\)"),
re.compile(r"data_stream\.dataset\s+in\s*\(\s*([^)]+)\s*\)"),
],
"eq": [
re.compile(r'event\.dataset\s*==\s*"([^"]+)"'),
re.compile(r'data_stream\.dataset\s*==\s*"([^"]+)"'),
],
}
# Extract datasets
datasets: list[str] = []
for regex_list in regex_patterns.values():
for regex in regex_list:
matches = regex.findall(query)
if matches:
for match in matches:
if "," in match:
# Handle `in` case with multiple values
datasets.extend([ds.strip().strip('"') for ds in match.split(",")])
else:
# Handle `==` case
datasets.append(match.strip().strip('"'))
event_datasets: list[EventDataset] = []
for dataset in datasets:
parts = dataset.split(".")
if len(parts) == number_of_parts: # Ensure there are exactly two parts
event_datasets.append(EventDataset(package=parts[0], integration=parts[1]))
return event_datasets