Setup

The first thing to do is to establish a connection with the Elasticsearch server. In our case, it is running locally on port 9200.

library(elasticquery)

con <- es_connect(
  host = "127.0.0.1", port = 9200,
  primary_index = "my-index")

This connection object is passed to queries so it knows where to go to run the query.

Note that we can also specify the primary_index, which is the index used by default by queries we build unless we specify otherwise in the query. Here we are querying “my-index” by default.

Queries

Overview

Two major types of queries are currently supported by this package.

  1. Aggregation: Documents are counted by specified fields in the data, query initiated with query_agg()
  2. Fetch: Documents are retrieved according to specified criteria, query initiated with query_fetch()

After a query is initiated, it can built upon by piping various operations:

  • agg_by_field(): specify a field to aggregate by, only for aggregation queries
  • agg_by_date(): specify a date field and date binning to aggregate by, only for aggregation queries
  • filter_match(): specify a field to filter documents on according to a string match (partial for text fields, exact for keyword fields), for both aggregation and fetch queries
  • filter_terms(): specify a field to filter documents on according to a string exact match, for both aggregation and fetch queries
  • filter_regexp(): specify a field to filter documents on according to a regular expression match, for both aggregation and fetch queries
  • filter_range(): specify a field to filter documents on according to a specified range, for both aggregation and fetch queries
  • select_fields(): specify fields to select in the returned documents, only for fetch queries
  • sort_docs(): specify fields by which to sort the returned documents, only for fetch queries.

Aggregation Queries

Aggregation queries are constructed by doing the following:

Initiating a Query

To initiate an aggregation query, we use the function query_agg(), and pass it our connection object.

query <- query_agg(con)

We can view the query’s translation to an Elasticsearch search body string simply through printing the query.

query
#> {}

Here, of course, the query is empty as we haven’t specified aggregation dimensions yet.

Queries can be executed using the run() function.

run(query)
#> # A tibble: 0 x 0

Since the query is empty, nothing is returned.

Getting a List of Queryable Fields

To begin specifying fields to aggregate on, it can be helpful to get a view of what fields are available to aggregate on. This can be done by passing the connection object to `queryable_fields().

queryable_fields(con)
#> # A tibble: 37 x 2
#>    field                type   
#>    <chr>                <chr>  
#>  1 affectedCountries    keyword
#>  2 affectedCountriesIso keyword
#>  3 childIds             keyword
#>  4 continentCodes       keyword
#>  5 description          text   
#>  6 duplicateId          keyword
#>  7 entityType           keyword
#>  8 flaggedByUserIds     integer
#>  9 fullText             text   
#> 10 id                   integer
#> # … with 27 more rows

Note that aggregations make most sense when done against categorical variables with some bounded cardinality. Typically keywords make the most sense to aggregate against, but even with keywords, you should use care to think about what fields make sense to aggregate.

Aggregating by Fields

Suppose we want to tabulate the frequency of all of the fields in the index. We can do this by adding agg_by_field() to our query, specifying the field name “tags”.

query <- query_agg(con) %>%
  agg_by_field("tags")

The function agg_by_field(), and all subsequent query modifying functions take a query object as its input and emit a modified query object as its output. This makes these functions suitable for piping, which is a convenient and expressive way to build queries.

To see what this new query looks like:

query
#> {
#>   "size": 0,
#>   "aggs": {
#>     "agg_results": {
#>       "composite": {
#>         "size": 1000,
#>         "sources": [
#>           {
#>             "tags": {
#>               "terms": {
#>                 "field": "tags"
#>               }
#>             }
#>           }
#>         ]
#>       }
#>     }
#>   }
#> }

Note that aggregation queries use composite aggregation with paging, and running the query will automatically take care of recurrent queries until paging is done and bind the results together, saving a lot of tedious work.

We can retrieve the result of this query by calling run().

run(query)
#> # A tibble: 912 x 2
#>    tags                                 count
#>    <chr>                                <int>
#>  1 f:10:All MSF Categories               6010
#>  2 f:10:Biological                       5060
#>  3 f:10:CWA                                 2
#>  4 f:10:Chemical                           64
#>  5 f:10:Disasters                         982
#>  6 f:10:HPV_grouped                         5
#>  7 f:10:HPV_notgrouped                     14
#>  8 f:10:Scenarios                           9
#>  9 f:10:zAll Hazards Threats (optional)    25
#> 10 f:11:All UNICEF Categories            5098
#> # … with 902 more rows

We can continue to add more dimensions to the aggregation using pipes. For example, to count the frequency of both the fields “tags” and “affectedCountriesIso”:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_field("affectedCountriesIso") %>%
  run()
#> # A tibble: 54,595 x 3
#>    tags                    affectedCountriesIso count
#>    <chr>                   <chr>                <int>
#>  1 f:10:All MSF Categories AD                      11
#>  2 f:10:All MSF Categories AE                      36
#>  3 f:10:All MSF Categories AF                      64
#>  4 f:10:All MSF Categories AG                       2
#>  5 f:10:All MSF Categories AI                       1
#>  6 f:10:All MSF Categories AL                      13
#>  7 f:10:All MSF Categories AM                      19
#>  8 f:10:All MSF Categories AO                       3
#>  9 f:10:All MSF Categories AR                     236
#> 10 f:10:All MSF Categories AS                       3
#> # … with 54,585 more rows

Aggregating by Date Binning

Suppose we want to get daily counts for each tag in the data. We can use a function agg_by_date(), which by default aggregates daily.

Here, we aggregate on a document’s field “processedOnDate”.

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate") %>%
  run()
#> # A tibble: 4,291 x 3
#>    tags                    processedOnDate     count
#>    <chr>                   <dttm>              <int>
#>  1 f:10:All MSF Categories 2020-03-05 00:00:00    37
#>  2 f:10:All MSF Categories 2020-03-06 00:00:00   416
#>  3 f:10:All MSF Categories 2020-03-08 00:00:00    21
#>  4 f:10:All MSF Categories 2020-03-13 00:00:00   179
#>  5 f:10:All MSF Categories 2020-03-15 00:00:00    69
#>  6 f:10:All MSF Categories 2020-03-16 00:00:00   104
#>  7 f:10:All MSF Categories 2020-03-17 00:00:00  1345
#>  8 f:10:All MSF Categories 2020-03-18 00:00:00  3839
#>  9 f:10:Biological         2020-03-15 00:00:00     1
#> 10 f:10:Biological         2020-03-16 00:00:00     2
#> # … with 4,281 more rows

For finer control over the date binning, we can use functions calendar_interval() and fixed_interval().

For example, to bin on calendar week:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate", calendar_interval("1w")) %>%
  run()
#> # A tibble: 2,033 x 3
#>    tags                    processedOnDate     count
#>    <chr>                   <dttm>              <int>
#>  1 f:10:All MSF Categories 2020-03-02 00:00:00   474
#>  2 f:10:All MSF Categories 2020-03-09 00:00:00   248
#>  3 f:10:All MSF Categories 2020-03-16 00:00:00  5288
#>  4 f:10:Biological         2020-03-09 00:00:00     1
#>  5 f:10:Biological         2020-03-16 00:00:00  5059
#>  6 f:10:CWA                2020-03-16 00:00:00     2
#>  7 f:10:Chemical           2020-03-16 00:00:00    64
#>  8 f:10:Disasters          2020-03-02 00:00:00   474
#>  9 f:10:Disasters          2020-03-09 00:00:00   247
#> 10 f:10:Disasters          2020-03-16 00:00:00   261
#> # … with 2,023 more rows

And to bin on every 10 days:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate", fixed_interval("10d")) %>%
  run()
#> # A tibble: 1,470 x 3
#>    tags                    processedOnDate     count
#>    <chr>                   <dttm>              <int>
#>  1 f:10:All MSF Categories 2020-02-28 00:00:00   474
#>  2 f:10:All MSF Categories 2020-03-09 00:00:00  5536
#>  3 f:10:Biological         2020-03-09 00:00:00  5060
#>  4 f:10:CWA                2020-03-09 00:00:00     2
#>  5 f:10:Chemical           2020-03-09 00:00:00    64
#>  6 f:10:Disasters          2020-02-28 00:00:00   474
#>  7 f:10:Disasters          2020-03-09 00:00:00   508
#>  8 f:10:HPV_grouped        2020-03-09 00:00:00     5
#>  9 f:10:HPV_notgrouped     2020-03-09 00:00:00    14
#> 10 f:10:Scenarios          2020-03-09 00:00:00     9
#> # … with 1,460 more rows

Filtering

We can further modify an aggregation query by specifying filters. Three types of filters are currently available:

  • Range filters: specify a range of values a field can have
  • Terms filters: specify a value or vector of values a field must take
  • Match filters: match a specified string in a field

Note that filters can apply to both aggregation and fetch queries.

Range Filters

Range filters are specifyind using filter_range(), specifying the field to filter, and then specifying one or both of from and to values for the range.

For example, to take our earlier aggregation query and filter it to dates later than 2018-01-01:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate") %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  run()
#> # A tibble: 2,958 x 3
#>    tags                    processedOnDate     count
#>    <chr>                   <dttm>              <int>
#>  1 f:10:All MSF Categories 2020-03-13 00:00:00   179
#>  2 f:10:All MSF Categories 2020-03-15 00:00:00    69
#>  3 f:10:All MSF Categories 2020-03-16 00:00:00   104
#>  4 f:10:All MSF Categories 2020-03-17 00:00:00  1345
#>  5 f:10:All MSF Categories 2020-03-18 00:00:00  3839
#>  6 f:10:Biological         2020-03-15 00:00:00     1
#>  7 f:10:Biological         2020-03-16 00:00:00     2
#>  8 f:10:Biological         2020-03-17 00:00:00  1317
#>  9 f:10:Biological         2020-03-18 00:00:00  3740
#> 10 f:10:CWA                2020-03-18 00:00:00     2
#> # … with 2,948 more rows

To filter on a date/time, the format is like the following:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate") %>%
  filter_range("processedOnDate", from = "2020-03-10T10:21:32") %>%
  run()
#> # A tibble: 2,958 x 3
#>    tags                    processedOnDate     count
#>    <chr>                   <dttm>              <int>
#>  1 f:10:All MSF Categories 2020-03-13 00:00:00   179
#>  2 f:10:All MSF Categories 2020-03-15 00:00:00    69
#>  3 f:10:All MSF Categories 2020-03-16 00:00:00   104
#>  4 f:10:All MSF Categories 2020-03-17 00:00:00  1345
#>  5 f:10:All MSF Categories 2020-03-18 00:00:00  3839
#>  6 f:10:Biological         2020-03-15 00:00:00     1
#>  7 f:10:Biological         2020-03-16 00:00:00     2
#>  8 f:10:Biological         2020-03-17 00:00:00  1317
#>  9 f:10:Biological         2020-03-18 00:00:00  3740
#> 10 f:10:CWA                2020-03-18 00:00:00     2
#> # … with 2,948 more rows

Terms Filters

The funtion filter_terms() adds a filter to a query that specifies certain values a field must have to be included in the aggregation.

For example, to add to our earlier query, suppose we require that “affectedCountriesIso” must contain “US” or “CA”:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate", calendar_interval("1w")) %>%
  filter_range("processedOnDate", from = "2018-01-01") %>%
  filter_terms("affectedCountriesIso", c("US", "CA")) %>%
  run()
#> # A tibble: 1,638 x 3
#>    tags                    processedOnDate     count
#>    <chr>                   <dttm>              <int>
#>  1 f:10:All MSF Categories 2020-03-02 00:00:00    86
#>  2 f:10:All MSF Categories 2020-03-09 00:00:00    25
#>  3 f:10:All MSF Categories 2020-03-16 00:00:00  1195
#>  4 f:10:Biological         2020-03-16 00:00:00  1169
#>  5 f:10:CWA                2020-03-16 00:00:00     1
#>  6 f:10:Chemical           2020-03-16 00:00:00    17
#>  7 f:10:Disasters          2020-03-02 00:00:00    86
#>  8 f:10:Disasters          2020-03-09 00:00:00    25
#>  9 f:10:Disasters          2020-03-16 00:00:00    37
#> 10 f:10:HPV_grouped        2020-03-16 00:00:00     2
#> # … with 1,628 more rows

Regexp Filters

The function filter_regexp() adds a filter to a query that provides a regular expression that a field must match to be included in the results. Note that unfortuately Elasticsearch regular expressions are case sensitive.

To aggregate tags but only for documents that have a tag that contain “Corona”, for example:

query_agg(con) %>%
  agg_by_field("tags") %>%
  filter_regexp("tags", ".*Corona.*") %>%
  run()
#> # A tibble: 677 x 2
#>    tags                                 count
#>    <chr>                                <int>
#>  1 f:10:All MSF Categories               4899
#>  2 f:10:Biological                       4899
#>  3 f:10:CWA                                 1
#>  4 f:10:Chemical                           27
#>  5 f:10:Disasters                          62
#>  6 f:10:HPV_grouped                         1
#>  7 f:10:HPV_notgrouped                      7
#>  8 f:10:Scenarios                           3
#>  9 f:10:zAll Hazards Threats (optional)     6
#> 10 f:11:All UNICEF Categories            4899
#> # … with 667 more rows

Note that here we are filtering on a field that is an array of values for each document. Because of this, we will get more tags in our resulting aggregation than tags that include “Corona”. Here, we are counting all tags that are present in each article that contains a tag matching “Corona”.

Match Filters

The function filter_match() specifies a filter to only include documents where the specified field contains a match for the provided string.

For example, to further refine our aggregation to only include documents where a match for the string “disease” is found in the full text:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate", calendar_interval("1w")) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  filter_terms("affectedCountriesIso", c("US", "CA")) %>%
  filter_match("fullText", "disease") %>%
  run()
#> # A tibble: 772 x 3
#>    tags                                 processedOnDate     count
#>    <chr>                                <dttm>              <int>
#>  1 f:10:All MSF Categories              2020-03-09 00:00:00     1
#>  2 f:10:All MSF Categories              2020-03-16 00:00:00   196
#>  3 f:10:Biological                      2020-03-16 00:00:00   194
#>  4 f:10:CWA                             2020-03-16 00:00:00     1
#>  5 f:10:Chemical                        2020-03-16 00:00:00     5
#>  6 f:10:Disasters                       2020-03-09 00:00:00     1
#>  7 f:10:Disasters                       2020-03-16 00:00:00     7
#>  8 f:10:HPV_notgrouped                  2020-03-16 00:00:00     2
#>  9 f:10:zAll Hazards Threats (optional) 2020-03-16 00:00:00     2
#> 10 f:11:All UNICEF Categories           2020-03-16 00:00:00   196
#> # … with 762 more rows

Simple query string filters

You can add a simple query string filter by using the function filter_sqs(). This allows you to specify field name or vector of field names to search, and then a query string to search in these fields.

Below we are searching fields “fullText” and “title” for the specified search string.

docs <- query_fetch(con, max = 100) %>%
  filter_sqs(c("fullText", "title"), "((health | healthcare | \\\"health care\\\" | medical) + (specialist* | provider* | professional* | practitioner* | doctor* | worker* | personnel | staff)) | physician* | \\\"general practitioner\\\" | therapist*") %>%
  select_fields(c("fullText", "title")) %>%
  run()
#> Fetching first 100 of 100 total documents...
#> 100 documents fetched (100%)...

Boolean filter operators

All of the filter_*() functions have a boolean parameter bool that allows you to specify the boolean logic for the filter. The supported options are described here. The default is “must”, meaning that the specified filter must appear in the returned documents.

Below, we are filtering on the field “tags”, saying that we want documents with tags “l:WPRO”, but no documents with tags “l:CoronavirusInfection”.

docs <- query_fetch(con, max = 100) %>%
  filter_terms("tags", "l:WPRO") %>%
  filter_terms("tags", "l:CoronavirusInfection", bool = "must_not") %>%
  run()
#> Fetching first 100 of 100 total documents...
#> 100 documents fetched (100%)...

Fetch Queries

Fetch queries simply retrieve documents based on filtering criteria. All of the filtering functions specified above apply to these queries.

Initiating a Query

Similar to aggregation queries, a fetch query is initialized using query_fetch(), which takes as its primary argument the connection object.

One optional argument of note to this function is path, which specifies a directory to write docuents to as they are fetched. If this is not specified, results will be read into memory. If the result set looks like it will be very large, a warning is provided that encourages the user to provide a path and write to disk.

If we intialize a fetch query with no refinements, it will returl all documents in the index.

For example, with our example index which contains 10k documents:

docs <- query_fetch(con) %>%
  run()

This will fetch all 10k documents and return them as a large list to docs.

Note that fetch queries automatically take care of scrolling to retrieve potentially very large sets of documents. The scroll limit is 10k documents, so iterative queries are run to fetch these in batches and piece them together upon retrieval.

Adding Filters to Fetch Queries

It is probably more desirable for a fetch query to pinpoint records of interest rather than retrieve all documents. This can be done using filter queries as we specified earlier.

For example, to fetch all documents matching the filtering criteria we specified in the final aggregation example:

docs <- query_fetch(con) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  filter_terms("affectedCountriesIso", c("US", "CA")) %>%
  filter_match("fullText", "disease") %>%
  run()
#> Fetching 288 total documents...
#> 288 documents fetched (100%)...

Checking to see how large query results will be

Sometimes we may wish to see how large a query is before executing it. To do this, we can replace run() with n_docs(). To make query execution even faster, we can specify max = 0 so that only no documents are fetched.

query_fetch(con, max = 0) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  filter_terms("affectedCountriesIso", c("US", "CA")) %>%
  filter_match("fullText", "disease") %>%
  n_docs()
#> [1] 288

Fetching a subset of records

There is an argument max in query_fetch() that can be used to specify that we just want to fetch the first max documents. This can be useful if we want to experiment with our query and the resulting data before doing a full fetch.

docs <- query_fetch(con, max = 10) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  filter_terms("affectedCountriesIso", c("US", "CA")) %>%
  filter_match("fullText", "disease") %>%
  run()
#> Fetching first 10 of 10 total documents...
#> 10 documents fetched (100%)...

Fetching to Disk

In the previous fetch examples, the return object docs has been a list format of the document content of the query.

In a many cases we may wish to do a bulk download of many articles. If we specify a path argument to query_fetch(), the results will be written in batches to the specified directory.

For example, to write our last query to disk, we specify a directory in our query initizilaztion. Also, note that to simulate scrolling, we specify each iteration of the query to retrieve 10 documents (instead of the default 10k documents) with the size argument. With this, we see that two files get written, one for each scroll.

tf <- tempfile()
dir.create(tf)
docs <- query_fetch(con, path = tf, size = 10) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  filter_terms("affectedCountriesIso", c("US", "CA")) %>%
  filter_match("fullText", "disease") %>%
  run()
#> Fetching 288 total documents...
#> 10 documents fetched (3%)...
#> 20 documents fetched (7%)...
#> 30 documents fetched (10%)...
#> 40 documents fetched (14%)...
#> 50 documents fetched (17%)...
#> 60 documents fetched (21%)...
#> 70 documents fetched (24%)...
#> 80 documents fetched (28%)...
#> 90 documents fetched (31%)...
#> 100 documents fetched (35%)...
#> 110 documents fetched (38%)...
#> 120 documents fetched (42%)...
#> 130 documents fetched (45%)...
#> 140 documents fetched (49%)...
#> 150 documents fetched (52%)...
#> 160 documents fetched (56%)...
#> 170 documents fetched (59%)...
#> 180 documents fetched (62%)...
#> 190 documents fetched (66%)...
#> 200 documents fetched (69%)...
#> 210 documents fetched (73%)...
#> 220 documents fetched (76%)...
#> 230 documents fetched (80%)...
#> 240 documents fetched (83%)...
#> 250 documents fetched (87%)...
#> 260 documents fetched (90%)...
#> 270 documents fetched (94%)...
#> 280 documents fetched (97%)...
#> 288 documents fetched (100%)...

list.files(docs)
#>  [1] "out0001.json" "out0002.json" "out0003.json" "out0004.json" "out0005.json"
#>  [6] "out0006.json" "out0007.json" "out0008.json" "out0009.json" "out0010.json"
#> [11] "out0011.json" "out0012.json" "out0013.json" "out0014.json" "out0015.json"
#> [16] "out0016.json" "out0017.json" "out0018.json" "out0019.json" "out0020.json"
#> [21] "out0021.json" "out0022.json" "out0023.json" "out0024.json" "out0025.json"
#> [26] "out0026.json" "out0027.json" "out0028.json" "out0029.json"

Specifying fields to sort on

Another operation available only for fetch queries is sort_docs(), which allows you to specify fields to sort by as part of the fetch.

For example:

docs <- query_fetch(con, size = 10, max = 25) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  sort_docs("processedOnDate") %>%
  run()
#> Fetching 25 total documents...
#> 10 documents fetched (40%)...
#> 20 documents fetched (80%)...
#> 25 documents fetched (100%)...

sapply(docs, function(x) x$`_source`$processedOnDate)
#>  [1] "2020-03-13T07:26:04.0422415Z" "2020-03-13T07:26:07.0304637Z"
#>  [3] "2020-03-13T07:26:07.8927456Z" "2020-03-13T07:26:09.0216872Z"
#>  [5] "2020-03-13T07:28:01.4437200Z" "2020-03-13T07:28:02.9752079Z"
#>  [7] "2020-03-13T07:28:13.5119956Z" "2020-03-13T07:28:15.0721117Z"
#>  [9] "2020-03-13T07:30:03.4775738Z" "2020-03-13T07:30:05.2785935Z"
#> [11] "2020-03-13T07:30:08.0987420Z" "2020-03-13T07:31:08.4420386Z"
#> [13] "2020-03-13T07:31:08.7470622Z" "2020-03-13T07:32:10.5883424Z"
#> [15] "2020-03-13T07:32:10.9902103Z" "2020-03-13T07:32:15.2091531Z"
#> [17] "2020-03-13T07:32:18.7561610Z" "2020-03-13T07:33:02.1066126Z"
#> [19] "2020-03-13T07:33:05.4555161Z" "2020-03-13T07:33:06.1505288Z"
#> [21] "2020-03-13T07:34:08.6844120Z" "2020-03-13T07:35:05.6037369Z"
#> [23] "2020-03-13T07:35:09.8630094Z" "2020-03-13T07:35:10.7938444Z"
#> [25] "2020-03-13T07:37:01.3067962Z"

You can provide a vector of field names for nested sorting.

You can also wrap fields with utility functions asc() and desc() to specify the sort order:

docs <- query_fetch(con, size = 10, max = 25) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  sort_docs(desc("processedOnDate")) %>%
  run()
#> Fetching 25 total documents...
#> 10 documents fetched (40%)...
#> 20 documents fetched (80%)...
#> 25 documents fetched (100%)...

sapply(docs, function(x) x$`_source`$processedOnDate)
#>  [1] "2020-03-18T06:48:04.0678061Z" "2020-03-18T06:48:01.2303682Z"
#>  [3] "2020-03-18T06:47:12.4272729Z" "2020-03-18T06:47:06.5424375Z"
#>  [5] "2020-03-18T06:47:01.1790261Z" "2020-03-18T06:47:00.6332482Z"
#>  [7] "2020-03-18T06:46:04.3693800Z" "2020-03-18T06:46:04.0533870Z"
#>  [9] "2020-03-18T06:46:03.3320970Z" "2020-03-18T06:46:02.3600512Z"
#> [11] "2020-03-18T06:46:02.1100528Z" "2020-03-18T06:46:01.0043784Z"
#> [13] "2020-03-18T06:45:05.6178085Z" "2020-03-18T06:45:05.1447041Z"
#> [15] "2020-03-18T06:45:03.3038090Z" "2020-03-18T06:45:02.9738038Z"
#> [17] "2020-03-18T06:45:01.7208323Z" "2020-03-18T06:45:00.7528565Z"
#> [19] "2020-03-18T06:44:09.2082822Z" "2020-03-18T06:44:08.9183509Z"
#> [21] "2020-03-18T06:44:04.7698113Z" "2020-03-18T06:44:04.2997752Z"
#> [23] "2020-03-18T06:44:03.6822691Z" "2020-03-18T06:44:01.9492661Z"
#> [25] "2020-03-18T06:44:01.4906573Z"

Specifying fields to return

An operation available only for fetch queries, select_fields(), allows us to specify which fields should be returned for each document. This is useful of documents contain some fields that are very large and we don’t want to include them in our results.

To see what values are acceptable for a selectable field:

selectable_fields(con)
#> # A tibble: 45 x 2
#>    field                type   
#>    <chr>                <chr>  
#>  1 affectedCountries    keyword
#>  2 affectedCountriesIso keyword
#>  3 childIds             keyword
#>  4 comments             nested 
#>  5 continentCodes       keyword
#>  6 description          text   
#>  7 duplicateId          keyword
#>  8 entityType           keyword
#>  9 flaggedByUserIds     integer
#> 10 fullText             text   
#> # … with 35 more rows

For example, to return just the fields “source.countryIso” and “locations”:

docs <- query_fetch(con, size = 10, max = 25) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  select_fields(c("source.id", "triggers", "locations")) %>%
  run()
#> Fetching 25 total documents...
#> 10 documents fetched (40%)...
#> 20 documents fetched (80%)...
#> 25 documents fetched (100%)...

str(docs[[1]]$`_source`, 2)
#> List of 3
#>  $ locations:List of 2
#>   ..$ :List of 8
#>   ..$ :List of 9
#>  $ source   :List of 1
#>   ..$ id: int 11786
#>  $ triggers :List of 4
#>   ..$ :List of 2
#>   ..$ :List of 2
#>   ..$ :List of 2
#>   ..$ :List of 2

Ad hoc Queries

The interface provided by query_fetch(), query_agg(), and the accompanying filter and sort functions is meant to enable clean and precise definition of Elasticsearch queries, hiding all the messy details of scrolling, aggregation bucketing, query JSON specification, execution, etc. There may be times when this interface does not quite provide enough flexibility. In this scenario, a function, query_str(), has been created that allows you to provide a string specifying an Elasticsearch query that gets executed.

For example, suppose we would like to fetch all documents retrieved after 2020-03-10. As we have seen before, we can do this with the following:

docs <- query_fetch(con, size = 1000) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  run()

Suppose we want to run a query similar to this, but we need to modify the query a bit in a way that isn’t available through the interface provided in this package. Let’s first look at what the resulting query string is:

query_fetch(con, size = 1000) %>%
  filter_range("processedOnDate", from = "2020-03-10")
#> {
#>   "size": 1000,
#>   "query": {
#>     "bool": {
#>       "must": [
#>         {
#>           "range": {
#>             "processedOnDate": {
#>               "gte": "2020-03-10"
#>             }
#>           }
#>         }
#>       ]
#>     }
#>   }
#> }

For illustration purposes, we will take this string and run it directly without modification. To run this query string, we can do the following:

str <- '{
  "size": 1000,
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "processedOnDate": {
              "gte": "2020-03-10"
            }
          }
        }
      ]
    }
  }
}'

res <- query_str(con, str = str) %>%
  run()

Here we simply specified to run the query specified by this string. The structure of the output looks like this:

str(res, 2)
#> List of 4
#>  $ took     : int 43
#>  $ timed_out: logi FALSE
#>  $ _shards  :List of 4
#>   ..$ total     : int 1
#>   ..$ successful: int 1
#>   ..$ skipped   : int 0
#>   ..$ failed    : int 0
#>  $ hits     :List of 3
#>   ..$ total    :List of 2
#>   ..$ max_score: num 0
#>   ..$ hits     :List of 1000

Note that this is the raw output provided by Elasticsearch and not the more convenient list output provided by query_fetch(). Also note that it only pulled 1000 documents when we know there are more results in this query. These limitations occur because query_str() does not know what kind of query specificaiton it is going to receive, and therefore does not know how to best process the output.

To fully take advantage of the fetch and aggregation conveniences provided by this package, an additional type parameter is available to query_str() which by default is “unkown”, but can also be set to “fetch” or “agg”.

With our current example, if we add type = "fetch" to our call to query_str(), the execution now knows to handle the query as a fetch query and will take care of scrolling, etc. to fetch all of the documents.

res <- query_str(con, str = str, type = "fetch") %>%
  run()
#> Fetching 8136 total documents...
#> 1000 documents fetched (12%)...
#> 2000 documents fetched (25%)...
#> 3000 documents fetched (37%)...
#> 4000 documents fetched (49%)...
#> 5000 documents fetched (61%)...
#> 6000 documents fetched (74%)...
#> 7000 documents fetched (86%)...
#> 8000 documents fetched (98%)...
#> 8136 documents fetched (100%)...

length(res)
#> [1] 8136

While this example wasn’t very imaginative, suppose there is more complex filter logic you want to apply. A typical use case might be to use query_fetch() and associated filter/sort functions to get you started, modify the query string as needed, and pass that to query_str(). Note, however, that there is less protection in this scenario from errors in the query specification, so you can expect to see more error messages which will result in the need to debug your query string.

Similarly, we can apply query_str() to aggregation-type queries. Let’s take the following example:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_field("affectedCountriesIso")
#> {
#>   "size": 0,
#>   "aggs": {
#>     "agg_results": {
#>       "composite": {
#>         "size": 1000,
#>         "sources": [
#>           {
#>             "tags": {
#>               "terms": {
#>                 "field": "tags"
#>               }
#>             }
#>           },
#>           {
#>             "affectedCountriesIso": {
#>               "terms": {
#>                 "field": "affectedCountriesIso"
#>               }
#>             }
#>           }
#>         ]
#>       }
#>     }
#>   }
#> }

We can run this same query using query_str() as follows:

str <- '{
  "size": 0,
  "aggs": {
    "agg_results": {
      "composite": {
        "size": 1000,
        "sources": [
          {
            "tags": {
              "terms": {
                "field": "tags"
              }
            }
          },
          {
            "affectedCountriesIso": {
              "terms": {
                "field": "affectedCountriesIso"
              }
            }
          }
        ]
      }
    }
  }
}'

res <- query_str(con, str = str) %>%
  run()

str(res, 3)
#> List of 5
#>  $ took        : int 0
#>  $ timed_out   : logi FALSE
#>  $ _shards     :List of 4
#>   ..$ total     : int 1
#>   ..$ successful: int 1
#>   ..$ skipped   : int 0
#>   ..$ failed    : int 0
#>  $ hits        :List of 3
#>   ..$ total    :List of 2
#>   .. ..$ value   : int 10000
#>   .. ..$ relation: chr "eq"
#>   ..$ max_score: NULL
#>   ..$ hits     : list()
#>  $ aggregations:List of 1
#>   ..$ agg_results:List of 2
#>   .. ..$ after_key:List of 2
#>   .. ..$ buckets  :List of 1000

As before, we see that query_str() is “dumb” in that it doesn’t know we are doing an aggregation and doesn’t know to do the smart iteration to fetch the rest of the aggregation buckets as well as putting the result into a data frame.

To make query_str() “smart”, we can add type = "agg":

res <- query_str(con, str = str, type = "agg") %>%
  run()

res
#> # A tibble: 54,595 x 3
#>    tags                    affectedCountriesIso count
#>    <chr>                   <chr>                <int>
#>  1 f:10:All MSF Categories AD                      11
#>  2 f:10:All MSF Categories AE                      36
#>  3 f:10:All MSF Categories AF                      64
#>  4 f:10:All MSF Categories AG                       2
#>  5 f:10:All MSF Categories AI                       1
#>  6 f:10:All MSF Categories AL                      13
#>  7 f:10:All MSF Categories AM                      19
#>  8 f:10:All MSF Categories AO                       3
#>  9 f:10:All MSF Categories AR                     236
#> 10 f:10:All MSF Categories AS                       3
#> # … with 54,585 more rows

Limitations

This package is experimental and has not undergone rigorous testing to verify the correctness of the constructed queries. Use at your own risk.

The package has been written to cover a large number of immediate use cases. However, there are many additional features and parameters of Elasticsearch that could be exposed through this interface in the future.