Skip to content

Building a new parser

yujia21 edited this page Feb 23, 2024 · 27 revisions

Electricity Maps parsers

A parser is a python script that defines one or more functions that fetch data for a particular zone and/or exchange.

A parser can be built if there is a public URL that contains electricity generation data for a specific zone. See the Technical Requirements for Parsers to verify that the source you have found does indeed fit these requirements.

If you're looking to contribute, but don't have a specific zone in mind, you can take a look at the Missing Countries Overview which has information on where we think a parser might be buildable.

Parser validation

In 2023, we implemented a new data structure to be returned by the parsers. The aim of this new structure is to make sure that all parsed data is compliant with the expected format. This prevents errors further down the pipeline.

Return value signature

A parser should return an EventList object. An Event object represents any kind of grid events that we are capable of processing. The backend will automatically update past values properly.

The Event class is abstract and is used to create the following sub-classes representing the different data kind we accept at the moment:

  • Exchange
  • TotalProduction
  • ProductionBreakdown
  • TotalConsumption
  • Price

In cases where the parser function collects data for more than one datetime, the data should be stored in the following objects:

  • ExchangeList
  • TotalProductionList
  • ProductionBreakdownList
  • TotalConsumptionList
  • PriceList

Each event has a sourceType attribute:

  • measured
  • estimated
  • forecasted

By default, the sourceType is set to measured and this attribute doesn't need to be set. For forecasted data (ie. consumption forecast), the sourceType needs to be set to forecasted at the creation of the event.

Each event represents a data point that provides the value for a given datetime. The different Event objects are found here The expected format for a production parser event follows:

ProductionBreakdown.create(
    logger=logger,
	datetime=datetime(2023,5,8,16,34, tzinfo=ZoneInfo('timezone'),
	zoneKey=ZoneKey('SOME_ZONE'),
	source='mysource.comproduction=ProductionMix(wind=7, solar=8, hydro=10)
)

The expected format for a consumption parser event follows:

TotalConsumption(
	datetime=datetime(2023,5,8,16,34, tzinfo=ZoneInfo('timezone'),
	zoneKey=ZoneKey('SOME_ZONE'),
	source='mysource.comconsumption= 1234.56
)

A zone key identifier is required for each parser event. This identifier needs to be included in ZONES_CONFIG or EXCHANGES_CONFIG. The zone key object can be imported from config.

from electricitymap.contrib.config import ZoneKey
ProductionBreakdown(zoneKey=ZoneKey('DE'), ***values)

and for exchange parsers, the zone identifier are sorted zone keys:

Exchange(zoneKey=ZoneKey('DE->FR'), ***values)

For historical reasons we need for now to convert the EventList object to a list of plain dictionaries. This is done by using the method to_list() from the EventList class. This requirement will be removed at a later stage, once all parsers are using the same structure.

return ProductionBreakdownList(logger).to_list()

All timezones should either be a zoneinfo.ZoneInfo object or datetime.timezone.utc if the returned datetime is UTC, we no longer accept parsers that use pytz or arrow for timezones. These can imported like this:

from datetime import timezone.utc
from zoneinfo import ZoneInfo

Then they can be used like the following if the data has a specific timezone:

Price(
  datetime=datetime.fromisoformat("2023-11-13T10:30").replace(tzinfo=ZoneInfo('Europe/Copenhagen')),
  **otherValues
)

or like this if the time is UTC:

Price(
  datetime=datetime.fromisoformat("2023-11-13T10:30").replace(tzinfo=timezone.utc),
  **otherValues
)

Parser arguments

All parsers must contain the following arguments:

  • zone key information. zone_key if the parser only fetches data for a single zone. zone_key1 and zone_key2 if the parser fetches data for an exchange.
  • session: a python request session that you can re-use to make HTTP requests.
  • target_datetime: used to fetch historical data when available.
  • logger: a logging.Logger whose output is publicly available for anyone to monitor correct functioning of the parsers.

Parser template

To help you get started, you can find a parser template here as well as an example to build a fetch_production function.

See below for complete signatures

Parser functions

fetch_consumption

Return the consumption at the current time as a TotalConsumption object if only one data point is available or a TotalConsumptionList if the function parses data for multiple datetimes.

The consumption values (MW) should never be negative.

def fetch_consumption(
    zone_key: ZoneKey = ZoneKey('SOME_ZONE'),
    session: Session | None = None,
    target_datetime: datetime | None = None,
    logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
    session = session or Session()
    all_consumption_events = # all events with a datetime and a consumption value
    consumption_list = TotalConsumptionList(logger)
    for event in all_consumption_events:
        consumption_list.append(
            zoneKey=zone_key,
            datetime=event.datetime,
            consumption=event.consumption_value,
            source='mysource.com'
        )
    return consumption_list.to_list()

fetch_price

Return the price per MWh at the current time as a Price object if only one data point is available or a PriceList if the function parses data for multiple datetimes.

The currency values should be a three letter string representing the currency of the price value. View the code-symbol mapping from the currency-symbol-map node package.

price values is price per MWh and can be both positive and negative. It should, when possible, represent the day-ahead prices of the zone.

def fetch_price(
    zone_key: ZoneKey = ZoneKey('SOME_ZONE'),
    session: Session | None = None,
    target_datetime: datetime | None = None,
    logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
    session = session or Session()
    all_price_events = # all events with a datetime and a price value
    price_list = PriceList(logger)
    for event in all_price_events:
        price_list.append(
            zoneKey=zone_key,
            datetime=event.datetime,
            currency='EUR',
            price=event.price_value,
            source='mysource.com'
    )
    return price_list.to_list()

fetch_production

Here, the Event kind is ProductionBreakdown as the function returns the production mix at the current time. It can either be a ProductionBreakdown with the below fields if there is only one data point available, or a ProductionBreakdownList if multiple time values can be fetched.

The production values (MW) should never be negative. Use None, or omit the key if a specific production mode is not known.

storage values can be both positive (when storing energy) or negative (when the storage is discharged).

def fetch_production(
    zone_key: ZoneKey = ZoneKey('SOME_ZONE'),
    session: Session | None = None,
    target_datetime: datetime | None = None,
    logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
    session = session or Session()
    all_production_events = # all events with a datetime and a production breakdown and/or a storage breakdown
    production_list = ProductionBreakdownList(logger)
    for event in all_production_events:
        production_list.append(
            zoneKey=zone_key,
            datetime=event.datetime,
            production=event.production_mix,
            storage=event.storage_mix,
            source='mysource.com'
         )
    return production_list.to_list()

fetch_consumption_forecast

The function returns a TotalConsumption object. If there are multiple datapoints available, the function should return a TotalConsumptionList.

The sourceType is EventSourceType.forecasted.

def fetch_consumption_forecast(
    zone_key: ZoneKey = ZoneKey('SOME_ZONE'),
    session: Session | None = None,
    target_datetime: datetime | None = None,
    logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
    session = session or Session()
    all_consumption_events = # all events with a datetime and a consumption value
    consumption_list = TotalConsumptionList(logger)
    for event in all_consumption_events:
        consumption_list.append(
            zoneKey=zone_key,
            datetime=event.datetime,
            consumption=event.consumption_value,
            source='mysource.com',
            sourceType=EventSourceType.forecasted
        )
    return consumption_list.to_list()

fetch_generation_forecast

Returns data for total generation. The Event kind is TotalProduction as no power breakdown is available. If there are multiple events available, the function should return a TotalProductionList.

The sourceType is EventSourceType.forecasted

def fetch_generation_forecast(
    zone_key: ZoneKey = ZoneKey('SOME_ZONE'),
    session: Session | None = None,
    target_datetime: datetime | None = None,
    logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
    session = session or Session()
    all_generation_events = # all events with a datetime and a generation value
    generation_list = TotalProductionList(logger)
    for event in all_generation_events:
        consumption_list.append(
            zoneKey=zone_key,
            datetime=event.datetime,
            value=event.generation_value,
            source='mysource.com',
            sourceType=EventSourceType.forecasted
        )
    return generation_list.to_list()

fetch_wind_solar_forecasts

In this case, as the function collects forecasted production by mode, it returns a ProductionBreakdown object. If there are multiple datapoints available, the function should return a ProductionBreakdownList.

As the data is forecasted, the sourceType is EventSourceType.forecasted

def fetch_wind_solar_forecasts(
    zone_key: ZoneKey = ZoneKey('SOME_ZONE'),
    session: Session | None = None,
    target_datetime: datetime | None = None,
    logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
    session = session or Session()
    all_production_events = # all events with a datetime and a production breakdown
    production_list = ProductionBreakdownList(logger)
    for event in all_production_events:
        production_list.append(
            zoneKey=ZoneKey(zone_key),
            datetime=event.datetime,
            production=event.production_mix,
            source='mysource.com',
            sourceType=EventSourceType.forecasted
        )
    return production_list.to_list()

Exchange functions

fetch_exchange

Return the cross-border flow at the current time as an Exchange or an ExchangeList if data is available for multiple datetimes.

The sortedZoneKeys value should be a string in the format zone_keyA->zone_keyB deciding which to put first based on alphabetical order.

The netFlow value can be positive or negative, dictating the direction of the flow. Respecting the alphabetical sort of the zone keys, a positive value means the first zone is exporting to the second zone while a negative value means the first zone is importing from the second zone.

def fetch_exchange(
    zone_key1: ZoneKey,
    zone_key2: ZoneKey,
    session: Session | None = None,
    target_datetime: datetime | None = None,
    logger: Logger = getLogger(__name__),
)  -> list[dict[str, Any]]:
    session = session or Session()
    sortedZoneKeys = '->'.join(sorted([zone_key1, zone_key2]))
    all_exchange_events = # all events with a datetime and an exchange value
    exchange_list = ExchangeList(logger)
    for event in all_exchange_events:
         exchange_list.append(
           zoneKey=ZoneKey(sortedZoneKeys),
           datetime=event.datetime,
           netFlow=event.exchange_value,
           source='mysource.com'
         )
    return exchange_list.to_list()

fetch_exchange_forecast

Return the cross-border flow at the current time as an Exchange or an ExchangeList if data is available for multiple datetimes. As the data is forecasted, the sourceType is EventSourceType.forecasted

def fetch_exchange(
    zone_key1: ZoneKey,
    zone_key2: ZoneKey,
    session: Session | None = None,
    target_datetime: datetime | None = None,
    logger: Logger = getLogger(__name__),
)  -> list[dict[str, Any]]:
    session = session or Session()
    sortedZoneKeys = '->'.join(sorted([zone_key1, zone_key2]))
    all_exchange_events = # all events with a datetime and an exchange value
    exchange_list = ExchangeList(logger)
    for event in all_exchange_events:
        exchange_list.append(
            zoneKey=ZoneKey(sortedZoneKeys),
            datetime=event.datetime,
            netFlow=event.exchange_value,
            source='mysource.com',
            sourceType=EventSourceType.forecasted
        )
    return exchange_list.to_list()

Final steps

Once you're done, add your parser to the zones.json and exchanges.json configuration files. Finally update the real-time sources.

After setting up your local development environment, you can run all of the parser tests with the following command from the root directory:

poetry run test

For more info, check out the example parser or browse existing parsers.

Test parsers locally

  1. Set up your local development environment

  2. Ensure dependencies are installed:

    poetry install -E parsers
  3. From the root folder, use the test_parser.py command line utility:

    poetry run test_parser FR price  # get latest price parser for France
    poetry run test_parser FR  # defaults to production if no data type is given
    
    # test a specific datetime (parser needs to be able to fetch past datetimes)
    poetry run test_parser DE --target_datetime 2018-01-01T08:00
    poetry run test_parser "CH->FR" exchange # get the exchange data between Switzerland & France

Many of the tests require API keys of the data or web service providers, and therefore fail with an error message like

Exception: No ENTSOE_TOKEN found! Please add it into secrets.env!

In such cases, please browse the website related to the provider and ask for an API key. Once you get hold of the API key, make it an environment variable. This fixes the error.

Clone this wiki locally